Compare commits

..

7 Commits

Author SHA1 Message Date
ben
59fb881c0a Record t1.15 task evidence 2026-03-20 11:27:56 -04:00
ben
9104781b93 Refactor review pipeline around normalized items 2026-03-20 11:27:46 -04:00
ben
607c51038a Record t1.14.3 task evidence 2026-03-20 11:09:50 -04:00
ben
bcec6b37d3 Clean Costco normalization artifacts 2026-03-20 11:09:44 -04:00
ben
848d229f2d Record t1.14.2 task evidence 2026-03-20 10:05:08 -04:00
ben
d2e6f2afd3 Align refactor paths with data layout 2026-03-20 10:04:58 -04:00
424a777dd0 added git note 2026-03-20 09:58:25 -04:00
11 changed files with 663 additions and 420 deletions

View File

@@ -14,6 +14,12 @@ Run each script step-by-step from the terminal.
6. `review_products.py`: review unresolved product matches in the terminal 6. `review_products.py`: review unresolved product matches in the terminal
7. `report_pipeline_status.py`: show how many rows survive each stage 7. `report_pipeline_status.py`: show how many rows survive each stage
Active refactor entrypoints:
- `collect_giant_web.py`
- `collect_costco_web.py`
- `normalize_giant_web.py`
- `normalize_costco_web.py`
## Requirements ## Requirements
- Python 3.10+ - Python 3.10+
@@ -30,8 +36,8 @@ pip install -r requirements.txt
## Optional `.env` ## Optional `.env`
Current version works best with `.env` in the project root. The scraper will prompt for these values if they are not found in the current browser session. Current version works best with `.env` in the project root. The scraper will prompt for these values if they are not found in the current browser session.
- `scrape_giant` prompts if `GIANT_USER_ID` or `GIANT_LOYALTY_NUMBER` is missing. - `collect_giant_web.py` prompts if `GIANT_USER_ID` or `GIANT_LOYALTY_NUMBER` is missing.
- `scrape_costco` tries `.env` first, then Firefox local storage for session-backed values; `COSTCO_CLIENT_IDENTIFIER` should still be set explicitly. - `collect_costco_web.py` tries `.env` first, then Firefox local storage for session-backed values; `COSTCO_CLIENT_IDENTIFIER` should still be set explicitly.
- Costco discount matching happens later in `enrich_costco.py`; you do not need to pre-clean discount lines by hand. - Costco discount matching happens later in `enrich_costco.py`; you do not need to pre-clean discount lines by hand.
```env ```env
@@ -43,15 +49,39 @@ COSTCO_X_WCS_CLIENTID=...
COSTCO_CLIENT_IDENTIFIER=... COSTCO_CLIENT_IDENTIFIER=...
``` ```
Current active path layout:
```text
data/
giant-web/
raw/
collected_orders.csv
collected_items.csv
normalized_items.csv
costco-web/
raw/
collected_orders.csv
collected_items.csv
normalized_items.csv
review/
review_queue.csv
review_resolutions.csv
product_links.csv
purchases.csv
pipeline_status.csv
pipeline_status.json
catalog.csv
```
## Run Order ## Run Order
Run the pipeline in this order: Run the pipeline in this order:
```bash ```bash
python scrape_giant.py python collect_giant_web.py
python enrich_giant.py python normalize_giant_web.py
python scrape_costco.py python collect_costco_web.py
python enrich_costco.py python normalize_costco_web.py
python build_purchases.py python build_purchases.py
python review_products.py python review_products.py
python build_purchases.py python build_purchases.py
@@ -79,25 +109,25 @@ python report_pipeline_status.py
## Key Outputs ## Key Outputs
Giant: Giant:
- `giant_output/orders.csv` - `data/giant-web/collected_orders.csv`
- `giant_output/items.csv` - `data/giant-web/collected_items.csv`
- `giant_output/items_enriched.csv` - `data/giant-web/normalized_items.csv`
Costco: Costco:
- `costco_output/orders.csv` - `data/costco-web/collected_orders.csv`
- `costco_output/items.csv` - `data/costco-web/collected_items.csv`
- `costco_output/items_enriched.csv` - `data/costco-web/normalized_items.csv`
- `costco_output/items_enriched.csv` now preserves raw totals and matched net discount fields - `data/costco-web/normalized_items.csv` preserves raw totals and matched net discount fields
Combined: Combined:
- `combined_output/purchases.csv` - `data/review/purchases.csv`
- `combined_output/review_queue.csv` - `data/review/review_queue.csv`
- `combined_output/review_resolutions.csv` - `data/review/review_resolutions.csv`
- `combined_output/canonical_catalog.csv` - `data/review/product_links.csv`
- `combined_output/product_links.csv` - `data/review/comparison_examples.csv`
- `combined_output/comparison_examples.csv` - `data/review/pipeline_status.csv`
- `combined_output/pipeline_status.csv` - `data/review/pipeline_status.json`
- `combined_output/pipeline_status.json` - `data/catalog.csv`
## Review Workflow ## Review Workflow
@@ -114,7 +144,7 @@ The review step is intentionally conservative:
## Notes ## Notes
- This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction. - This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction.
- `scrape_giant.py` and `scrape_costco.py` are meant to work as standalone acquisition scripts. - `scrape_giant.py`, `scrape_costco.py`, `enrich_giant.py`, and `enrich_costco.py` are now legacy-compatible entrypoints; prefer the `collect_*` and `normalize_*` scripts for active work.
- Costco discount rows are preserved for auditability and also matched back to purchased items during enrichment. - Costco discount rows are preserved for auditability and also matched back to purchased items during enrichment.
- `validate_cross_retailer_flow.py` is a proof/check script, not a required production step. - `validate_cross_retailer_flow.py` is a proof/check script, not a required production step.

View File

@@ -3,11 +3,8 @@ from pathlib import Path
import click import click
import build_canonical_layer
import build_observed_products
import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, stable_id, write_csv_rows from layer_helpers import read_csv_rows, write_csv_rows
PURCHASE_FIELDS = [ PURCHASE_FIELDS = [
@@ -15,13 +12,18 @@ PURCHASE_FIELDS = [
"retailer", "retailer",
"order_id", "order_id",
"line_no", "line_no",
"observed_item_key", "normalized_row_id",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"review_status", "review_status",
"resolution_action", "resolution_action",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"catalog_name",
"category",
"product_type",
"brand",
"variant",
"image_url", "image_url",
"retailer_item_id", "retailer_item_id",
"upc", "upc",
@@ -55,7 +57,7 @@ PURCHASE_FIELDS = [
EXAMPLE_FIELDS = [ EXAMPLE_FIELDS = [
"example_name", "example_name",
"canonical_product_id", "catalog_id",
"giant_purchase_date", "giant_purchase_date",
"giant_raw_item_name", "giant_raw_item_name",
"giant_price_per_lb", "giant_price_per_lb",
@@ -66,8 +68,8 @@ EXAMPLE_FIELDS = [
] ]
CATALOG_FIELDS = [ CATALOG_FIELDS = [
"canonical_product_id", "catalog_id",
"canonical_name", "catalog_name",
"category", "category",
"product_type", "product_type",
"brand", "brand",
@@ -81,9 +83,20 @@ CATALOG_FIELDS = [
"updated_at", "updated_at",
] ]
PRODUCT_LINK_FIELDS = [
"normalized_item_id",
"catalog_id",
"link_method",
"link_confidence",
"review_status",
"reviewed_by",
"reviewed_at",
"link_notes",
]
RESOLUTION_FIELDS = [ RESOLUTION_FIELDS = [
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"resolution_action", "resolution_action",
"status", "status",
"resolution_notes", "resolution_notes",
@@ -91,10 +104,6 @@ RESOLUTION_FIELDS = [
] ]
def decimal_or_zero(value):
return to_decimal(value) or Decimal("0")
def derive_metrics(row): def derive_metrics(row):
line_total = to_decimal(row.get("net_line_total") or row.get("line_total")) line_total = to_decimal(row.get("net_line_total") or row.get("line_total"))
qty = to_decimal(row.get("qty")) qty = to_decimal(row.get("qty"))
@@ -162,10 +171,7 @@ def derive_metrics(row):
def order_lookup(rows, retailer): def order_lookup(rows, retailer):
return { return {(retailer, row["order_id"]): row for row in rows}
(retailer, row["order_id"]): row
for row in rows
}
def read_optional_csv_rows(path): def read_optional_csv_rows(path):
@@ -175,28 +181,10 @@ def read_optional_csv_rows(path):
return read_csv_rows(path) return read_csv_rows(path)
def load_resolution_lookup(resolution_rows): def normalize_catalog_row(row):
lookup = {}
for row in resolution_rows:
if not row.get("observed_product_id"):
continue
lookup[row["observed_product_id"]] = row
return lookup
def merge_catalog_rows(existing_rows, auto_rows):
merged = {}
for row in auto_rows + existing_rows:
canonical_product_id = row.get("canonical_product_id", "")
if canonical_product_id:
merged[canonical_product_id] = row
return sorted(merged.values(), key=lambda row: row["canonical_product_id"])
def catalog_row_from_canonical(row):
return { return {
"canonical_product_id": row.get("canonical_product_id", ""), "catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
"canonical_name": row.get("canonical_name", ""), "catalog_name": row.get("catalog_name") or row.get("canonical_name", ""),
"category": row.get("category", ""), "category": row.get("category", ""),
"product_type": row.get("product_type", ""), "product_type": row.get("product_type", ""),
"brand": row.get("brand", ""), "brand": row.get("brand", ""),
@@ -211,24 +199,67 @@ def catalog_row_from_canonical(row):
} }
def build_link_state(enriched_rows): def is_review_first_catalog_row(row):
observed_rows = build_observed_products.build_observed_products(enriched_rows) notes = row.get("notes", "").strip().lower()
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows) if notes.startswith("auto-linked via"):
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows) return False
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair( return True
canonical_rows,
link_rows,
giant_row,
costco_row,
)
observed_id_by_key = {
row["observed_key"]: row["observed_product_id"] for row in observed_rows def normalize_link_row(row):
return {
"normalized_item_id": row.get("normalized_item_id", ""),
"catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
"link_method": row.get("link_method", ""),
"link_confidence": row.get("link_confidence", ""),
"review_status": row.get("review_status", ""),
"reviewed_by": row.get("reviewed_by", ""),
"reviewed_at": row.get("reviewed_at", ""),
"link_notes": row.get("link_notes", ""),
} }
canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
def normalize_resolution_row(row):
return {
"normalized_item_id": row.get("normalized_item_id", ""),
"catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
"resolution_action": row.get("resolution_action", ""),
"status": row.get("status", ""),
"resolution_notes": row.get("resolution_notes", ""),
"reviewed_at": row.get("reviewed_at", ""),
} }
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
def load_resolution_lookup(resolution_rows):
lookup = {}
for row in resolution_rows:
normalized_row = normalize_resolution_row(row)
normalized_item_id = normalized_row.get("normalized_item_id", "")
if not normalized_item_id:
continue
lookup[normalized_item_id] = normalized_row
return lookup
def merge_catalog_rows(existing_rows, new_rows):
merged = {}
for row in existing_rows + new_rows:
normalized_row = normalize_catalog_row(row)
catalog_id = normalized_row.get("catalog_id", "")
if catalog_id:
merged[catalog_id] = normalized_row
return sorted(merged.values(), key=lambda row: row["catalog_id"])
def load_link_lookup(link_rows):
lookup = {}
for row in link_rows:
normalized_row = normalize_link_row(row)
normalized_item_id = normalized_row.get("normalized_item_id", "")
if not normalized_item_id:
continue
lookup[normalized_item_id] = normalized_row
return lookup
def build_purchase_rows( def build_purchase_rows(
@@ -237,25 +268,37 @@ def build_purchase_rows(
giant_orders, giant_orders,
costco_orders, costco_orders,
resolution_rows, resolution_rows,
link_rows=None,
catalog_rows=None,
): ):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows all_enriched_rows = giant_enriched_rows + costco_enriched_rows
(
observed_rows,
canonical_rows,
link_rows,
observed_id_by_key,
canonical_id_by_observed,
) = build_link_state(all_enriched_rows)
resolution_lookup = load_resolution_lookup(resolution_rows) resolution_lookup = load_resolution_lookup(resolution_rows)
for observed_product_id, resolution in resolution_lookup.items(): link_lookup = load_link_lookup(link_rows or [])
catalog_lookup = {
row["catalog_id"]: normalize_catalog_row(row)
for row in (catalog_rows or [])
if normalize_catalog_row(row).get("catalog_id")
}
for normalized_item_id, resolution in resolution_lookup.items():
action = resolution.get("resolution_action", "") action = resolution.get("resolution_action", "")
status = resolution.get("status", "") status = resolution.get("status", "")
if status != "approved": if status != "approved":
continue continue
if action in {"link", "create"} and resolution.get("canonical_product_id"): if action in {"link", "create"} and resolution.get("catalog_id"):
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"] link_lookup[normalized_item_id] = {
"normalized_item_id": normalized_item_id,
"catalog_id": resolution["catalog_id"],
"link_method": f"manual_{action}",
"link_confidence": "high",
"review_status": status,
"reviewed_by": "",
"reviewed_at": resolution.get("reviewed_at", ""),
"link_notes": resolution.get("resolution_notes", ""),
}
elif action == "exclude": elif action == "exclude":
canonical_id_by_observed[observed_product_id] = "" link_lookup.pop(normalized_item_id, None)
orders_by_id = {} orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant")) orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco")) orders_by_id.update(order_lookup(costco_orders, "costco"))
@@ -265,24 +308,30 @@ def build_purchase_rows(
all_enriched_rows, all_enriched_rows,
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])), key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
): ):
observed_key = build_observed_products.build_observed_key(row) normalized_item_id = row.get("normalized_item_id", "")
observed_product_id = observed_id_by_key.get(observed_key, "") resolution = resolution_lookup.get(normalized_item_id, {})
link_row = link_lookup.get(normalized_item_id, {})
catalog_row = catalog_lookup.get(link_row.get("catalog_id", ""), {})
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {}) order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row) metrics = derive_metrics(row)
resolution = resolution_lookup.get(observed_product_id, {})
purchase_rows.append( purchase_rows.append(
{ {
"purchase_date": row["order_date"], "purchase_date": row["order_date"],
"retailer": row["retailer"], "retailer": row["retailer"],
"order_id": row["order_id"], "order_id": row["order_id"],
"line_no": row["line_no"], "line_no": row["line_no"],
"observed_item_key": row["observed_item_key"], "normalized_row_id": row.get("normalized_row_id", ""),
"observed_product_id": observed_product_id, "normalized_item_id": normalized_item_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""), "catalog_id": link_row.get("catalog_id", ""),
"review_status": resolution.get("status", ""), "review_status": resolution.get("status", ""),
"resolution_action": resolution.get("resolution_action", ""), "resolution_action": resolution.get("resolution_action", ""),
"raw_item_name": row["item_name"], "raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"], "normalized_item_name": row["item_name_norm"],
"catalog_name": catalog_row.get("catalog_name", ""),
"category": catalog_row.get("category", ""),
"product_type": catalog_row.get("product_type", ""),
"brand": catalog_row.get("brand", ""),
"variant": catalog_row.get("variant", ""),
"image_url": row.get("image_url", ""), "image_url": row.get("image_url", ""),
"retailer_item_id": row["retailer_item_id"], "retailer_item_id": row["retailer_item_id"],
"upc": row["upc"], "upc": row["upc"],
@@ -307,33 +356,7 @@ def build_purchase_rows(
**metrics, **metrics,
} }
) )
return purchase_rows, observed_rows, canonical_rows, link_rows return purchase_rows, sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"])
def apply_manual_resolutions_to_links(link_rows, resolution_rows):
link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows}
for resolution in resolution_rows:
if resolution.get("status") != "approved":
continue
observed_product_id = resolution.get("observed_product_id", "")
action = resolution.get("resolution_action", "")
if not observed_product_id:
continue
if action == "exclude":
link_by_observed.pop(observed_product_id, None)
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
link_by_observed[observed_product_id] = {
"observed_product_id": observed_product_id,
"canonical_product_id": resolution["canonical_product_id"],
"link_method": f"manual_{action}",
"link_confidence": "high",
"review_status": resolution.get("status", ""),
"reviewed_by": "",
"reviewed_at": resolution.get("reviewed_at", ""),
"link_notes": resolution.get("resolution_notes", ""),
}
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
def build_comparison_examples(purchase_rows): def build_comparison_examples(purchase_rows):
@@ -342,7 +365,7 @@ def build_comparison_examples(purchase_rows):
for row in purchase_rows: for row in purchase_rows:
if row.get("normalized_item_name") != "BANANA": if row.get("normalized_item_name") != "BANANA":
continue continue
if not row.get("canonical_product_id"): if not row.get("catalog_id"):
continue continue
if row["retailer"] == "giant" and row.get("price_per_lb"): if row["retailer"] == "giant" and row.get("price_per_lb"):
giant_banana = row giant_banana = row
@@ -355,7 +378,7 @@ def build_comparison_examples(purchase_rows):
return [ return [
{ {
"example_name": "banana_price_per_lb", "example_name": "banana_price_per_lb",
"canonical_product_id": giant_banana["canonical_product_id"], "catalog_id": giant_banana["catalog_id"],
"giant_purchase_date": giant_banana["purchase_date"], "giant_purchase_date": giant_banana["purchase_date"],
"giant_raw_item_name": giant_banana["raw_item_name"], "giant_raw_item_name": giant_banana["raw_item_name"],
"giant_price_per_lb": giant_banana["price_per_lb"], "giant_price_per_lb": giant_banana["price_per_lb"],
@@ -368,15 +391,15 @@ def build_comparison_examples(purchase_rows):
@click.command() @click.command()
@click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True) @click.option("--giant-items-enriched-csv", default="data/giant-web/normalized_items.csv", show_default=True)
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True) @click.option("--costco-items-enriched-csv", default="data/costco-web/normalized_items.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True) @click.option("--giant-orders-csv", default="data/giant-web/collected_orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True) @click.option("--costco-orders-csv", default="data/costco-web/collected_orders.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True) @click.option("--catalog-csv", default="data/catalog.csv", show_default=True)
@click.option("--links-csv", default="combined_output/product_links.csv", show_default=True) @click.option("--links-csv", default="data/review/product_links.csv", show_default=True)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--output-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True) @click.option("--examples-csv", default="data/review/comparison_examples.csv", show_default=True)
def main( def main(
giant_items_enriched_csv, giant_items_enriched_csv,
costco_items_enriched_csv, costco_items_enriched_csv,
@@ -389,27 +412,29 @@ def main(
examples_csv, examples_csv,
): ):
resolution_rows = read_optional_csv_rows(resolutions_csv) resolution_rows = read_optional_csv_rows(resolutions_csv)
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows( catalog_rows = merge_catalog_rows(
[row for row in read_optional_csv_rows(catalog_csv) if is_review_first_catalog_row(row)],
[],
)
existing_links = [normalize_link_row(row) for row in read_optional_csv_rows(links_csv)]
purchase_rows, link_rows = build_purchase_rows(
read_csv_rows(giant_items_enriched_csv), read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv), read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv), read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv), read_csv_rows(costco_orders_csv),
resolution_rows, resolution_rows,
existing_links,
catalog_rows,
) )
existing_catalog_rows = read_optional_csv_rows(catalog_csv)
merged_catalog_rows = merge_catalog_rows(
existing_catalog_rows,
[catalog_row_from_canonical(row) for row in canonical_rows],
)
link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows)
example_rows = build_comparison_examples(purchase_rows) example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS) write_csv_rows(catalog_csv, catalog_rows, CATALOG_FIELDS)
write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS) write_csv_rows(links_csv, link_rows, PRODUCT_LINK_FIELDS)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS) write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS) write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo( click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, " f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, " f"{len(catalog_rows)} catalog rows to {catalog_csv}, "
f"{len(link_rows)} product links to {links_csv}, "
f"and {len(example_rows)} comparison examples to {examples_csv}" f"and {len(example_rows)} comparison examples to {examples_csv}"
) )

View File

@@ -30,6 +30,11 @@ CODE_TOKEN_RE = re.compile(
) )
PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b") PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b")
HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b") HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b")
ITEM_CODE_RE = re.compile(r"#\w+\b")
DUAL_WEIGHT_RE = re.compile(
r"\b\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\s*/\s*\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\b"
)
LOGISTICS_SLASH_RE = re.compile(r"\b(?:T\d+/H\d+(?:/P\d+)?/?|H\d+/P\d+/?|T\d+/H\d+/?)\b")
PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b") PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b")
PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b") PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b")
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G)\b") SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G)\b")
@@ -98,12 +103,17 @@ def normalize_costco_name(cleaned_name):
base = PACK_FRACTION_RE.sub(" ", base) base = PACK_FRACTION_RE.sub(" ", base)
else: else:
base = SIZE_RE.sub(" ", base) base = SIZE_RE.sub(" ", base)
base = DUAL_WEIGHT_RE.sub(" ", base)
base = HASH_SIZE_RE.sub(" ", base) base = HASH_SIZE_RE.sub(" ", base)
base = ITEM_CODE_RE.sub(" ", base)
base = LOGISTICS_SLASH_RE.sub(" ", base)
base = PACK_DASH_RE.sub(" ", base) base = PACK_DASH_RE.sub(" ", base)
base = PACK_WORD_RE.sub(" ", base) base = PACK_WORD_RE.sub(" ", base)
base = normalize_whitespace(base) base = normalize_whitespace(base)
tokens = [] tokens = []
for token in base.split(): for token in base.split():
if token in {"/", "-"}:
continue
if token in {"ORG"}: if token in {"ORG"}:
continue continue
if token in {"PEANUT", "BUTTER"} and "JIF" in base: if token in {"PEANUT", "BUTTER"} and "JIF" in base:

View File

@@ -27,6 +27,7 @@ carry forward image url
3. build observed-product atble from enriched items 3. build observed-product atble from enriched items
* git issues * git issues
- dont try to git push from win emacs viewing wsl, it will be screwy (windows identity vs wsl)
** ssh / access to gitea ** ssh / access to gitea
ssh://git@192.168.1.207:2020/ben/scrape-giant.git ssh://git@192.168.1.207:2020/ben/scrape-giant.git
@@ -72,11 +73,11 @@ put point on the commit; highlighted remote gitea/cx
X : reset branch; prompts you, selected cx X : reset branch; prompts you, selected cx
** merge branch ** merge branch
b b : switch to branch to be merged into (cx) b b : switch to branch to be merged into (cx)
m m : pick branch to merge into current branch m m : pick branch to merge into current branch
* giant requests * giant requests
** item: ** item:
get: get:

View File

@@ -1,5 +1,5 @@
#+title: Scrape-Giant Task Log #+title: Scrape-Giant Task Log
#+STARTUP: overview
* [X] t1.1: harden giant receipt fetch cli (2-4 commits) * [X] t1.1: harden giant receipt fetch cli (2-4 commits)
** acceptance criteria ** acceptance criteria
- giant scraper runs from cli with prompts or env-backed defaults for `user_id` and `loyalty` - giant scraper runs from cli with prompts or env-backed defaults for `user_id` and `loyalty`
@@ -546,7 +546,7 @@ make Giant and Costco emit the shared normalized line-item schema without introd
- `normalized_item_id` is always present, but it only collapses repeated rows when the evidence is strong; otherwise it falls back to row-level identity via `normalized_row_id`. - `normalized_item_id` is always present, but it only collapses repeated rows when the evidence is strong; otherwise it falls back to row-level identity via `normalized_row_id`.
- Added `normalize_*` entry points for the new data-model layout while leaving the legacy `enrich_*` commands available during the transition. - Added `normalize_*` entry points for the new data-model layout while leaving the legacy `enrich_*` commands available during the transition.
* [ ] t1.14.2: finalize filesystem and schema alignment for the refactor (2-4 commits) * [X] t1.14.2: finalize filesystem and schema alignment for the refactor (2-4 commits)
bring on-disk outputs fully into the target `data/` structure without changing retailer behavior bring on-disk outputs fully into the target `data/` structure without changing retailer behavior
** Acceptance Criteria ** Acceptance Criteria
@@ -578,13 +578,16 @@ bring on-disk outputs fully into the target `data/` structure without changing r
- pm note: this is a structure-alignment task, not a retailer parsing task - pm note: this is a structure-alignment task, not a retailer parsing task
** evidence ** evidence
- commit: - commit: `d2e6f2a`
- tests: - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; `./venv/bin/python report_pipeline_status.py`; `./venv/bin/python build_purchases.py --help`; `./venv/bin/python review_products.py --help`; `./venv/bin/python report_pipeline_status.py --help`; verified `data/giant-web/collected_orders.csv`, `data/giant-web/collected_items.csv`, `data/costco-web/collected_orders.csv`, `data/costco-web/collected_items.csv`, `data/catalog.csv`, and archived transitional review outputs under `data/review/archive/`
- datetime: - datetime: [2026-03-20 10:04:15 EDT]
** notes ** notes
- No recollection was needed; existing raw and collected exports were adapted in place and moved into the target names.
- Updated the active script defaults to point at `data/...` so the code and on-disk layout now agree.
- Kept obviously obsolete review artifacts, but moved them under `data/review/archive/` instead of deleting them outright.
* [ ] t1.14.3: retailer-specific Costco normalization cleanup (2-4 commits) * [X] t1.14.3: retailer-specific Costco normalization cleanup (2-4 commits)
tighten Costco-specific normalization so normalized item names are cleaner and deterministic retailer grouping is less noisy tighten Costco-specific normalization so normalized item names are cleaner and deterministic retailer grouping is less noisy
** Acceptance Criteria ** Acceptance Criteria
@@ -612,13 +615,16 @@ tighten Costco-specific normalization so normalized item names are cleaner and d
- pm note: prefer a short allowlist/blocklist of known receipt artifacts over broad heuristics - pm note: prefer a short allowlist/blocklist of known receipt artifacts over broad heuristics
** evidence ** evidence
- commit: - commit: `bcec6b3`
- tests: - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python -m unittest tests.test_costco_pipeline`; `./venv/bin/python normalize_costco_web.py`; verified live cleaned examples in `data/costco-web/normalized_items.csv`, including `MANDARINS 2.27 KG / 5 LBS -> MANDARIN` and `LIFE 6'TABLE MDL #80873U - T12/H3/P36 -> LIFE 6'TABLE MDL`
- datetime: - datetime: 2026-03-20 11:09:32 EDT
** notes ** notes
- Kept this explicitly Costco-specific and narrow: the cleanup removes known logistics/code artifacts and orphan slash tokens without introducing fuzzy naming logic.
- The structured parsing still owns size/pack extraction, so name cleanup can safely strip dual-unit and logistics fragments after those fields are parsed.
- Discount-line behavior remains unchanged; this task only cleaned normalized names and preserved the existing audit trail.
* [ ] t1.15: refactor review/combine pipeline around normalized_item_id and catalog links (4-8 commits) * [x] t1.15: refactor review/combine pipeline around normalized_item_id and catalog links (4-8 commits)
replace the old observed/canonical workflow with a review-first pipeline that uses normalized_item_id as the retailer-level review unit and links it to catalog items replace the old observed/canonical workflow with a review-first pipeline that uses normalized_item_id as the retailer-level review unit and links it to catalog items
** Acceptance Criteria ** Acceptance Criteria
@@ -661,11 +667,15 @@ replace the old observed/canonical workflow with a review-first pipeline that us
9. pm note: keep review/combine auditable; each catalog link should be explainable from normalized rows and review state 9. pm note: keep review/combine auditable; each catalog link should be explainable from normalized rows and review state
** evidence ** evidence
- commit: - commit: `9104781`
- tests: - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; `./venv/bin/python report_pipeline_status.py`; `./venv/bin/python build_purchases.py --help`; `./venv/bin/python review_products.py --help`; `./venv/bin/python report_pipeline_status.py --help`
- datetime: - datetime: 2026-03-20 11:27:12 EDT
** notes ** notes
- The old observed/canonical auto-layer is no longer in the active review/combine path. `build_purchases.py`, `review_products.py`, and `report_pipeline_status.py` now operate on `normalized_item_id`, `catalog_id`, and `catalog_name`.
- I kept the review CLI shape intentionally close to the pre-refactor flow so the project only changed its identity model, not the operator workflow.
- Existing auto-generated catalog rows are no longer carried forward by default; only deliberate catalog entries survive. That keeps the new `catalog.csv` conservative, but it also means prior observed-based auto-links do not migrate into the new model.
- Live rerun after the refactor produced `627` purchase rows, `387` review-queue rows, `407` distinct normalized items, `0` linked normalized items, and `0` unresolved rows missing from the review queue.
* [ ] 1t.10: add optional llm-assisted suggestion workflow for unresolved normalized retailer items (2-4 commits) * [ ] 1t.10: add optional llm-assisted suggestion workflow for unresolved normalized retailer items (2-4 commits)

View File

@@ -3,7 +3,6 @@ from pathlib import Path
import click import click
import build_observed_products
import build_purchases import build_purchases
import review_products import review_products
from layer_helpers import read_csv_rows, write_csv_rows from layer_helpers import read_csv_rows, write_csv_rows
@@ -29,33 +28,36 @@ def build_status_summary(
purchases, purchases,
resolutions, resolutions,
): ):
enriched_rows = giant_enriched + costco_enriched normalized_rows = giant_enriched + costco_enriched
observed_rows = build_observed_products.build_observed_products(enriched_rows)
queue_rows = review_products.build_review_queue(purchases, resolutions) queue_rows = review_products.build_review_queue(purchases, resolutions)
queue_ids = {row["normalized_item_id"] for row in queue_rows}
unresolved_purchase_rows = [ unresolved_purchase_rows = [
row row
for row in purchases for row in purchases
if row.get("observed_product_id") if row.get("normalized_item_id")
and not row.get("canonical_product_id") and not row.get("catalog_id")
and row.get("is_fee") != "true" and row.get("is_fee") != "true"
and row.get("is_discount_line") != "true" and row.get("is_discount_line") != "true"
and row.get("is_coupon_line") != "true" and row.get("is_coupon_line") != "true"
] ]
excluded_rows = [ excluded_rows = [row for row in purchases if row.get("resolution_action") == "exclude"]
row linked_purchase_rows = [row for row in purchases if row.get("catalog_id")]
for row in purchases distinct_normalized_items = {
if row.get("resolution_action") == "exclude" row["normalized_item_id"] for row in normalized_rows if row.get("normalized_item_id")
] }
linked_purchase_rows = [row for row in purchases if row.get("canonical_product_id")] linked_normalized_items = {
row["normalized_item_id"] for row in purchases if row.get("normalized_item_id") and row.get("catalog_id")
}
summary = [ summary = [
{"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)}, {"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)},
{"stage": "raw_items", "count": len(giant_items) + len(costco_items)}, {"stage": "raw_items", "count": len(giant_items) + len(costco_items)},
{"stage": "enriched_items", "count": len(enriched_rows)}, {"stage": "normalized_items", "count": len(normalized_rows)},
{"stage": "observed_products", "count": len(observed_rows)}, {"stage": "distinct_normalized_items", "count": len(distinct_normalized_items)},
{"stage": "review_queue_observed_products", "count": len(queue_rows)}, {"stage": "review_queue_normalized_items", "count": len(queue_rows)},
{"stage": "canonical_linked_purchase_rows", "count": len(linked_purchase_rows)}, {"stage": "linked_normalized_items", "count": len(linked_normalized_items)},
{"stage": "linked_purchase_rows", "count": len(linked_purchase_rows)},
{"stage": "final_purchase_rows", "count": len(purchases)}, {"stage": "final_purchase_rows", "count": len(purchases)},
{"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)}, {"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)},
{"stage": "excluded_purchase_rows", "count": len(excluded_rows)}, {"stage": "excluded_purchase_rows", "count": len(excluded_rows)},
@@ -65,8 +67,7 @@ def build_status_summary(
[ [
row row
for row in unresolved_purchase_rows for row in unresolved_purchase_rows
if row.get("observed_product_id") if row.get("normalized_item_id") not in queue_ids
not in {queue_row["observed_product_id"] for queue_row in queue_rows}
] ]
), ),
}, },
@@ -75,16 +76,16 @@ def build_status_summary(
@click.command() @click.command()
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True) @click.option("--giant-orders-csv", default="data/giant-web/collected_orders.csv", show_default=True)
@click.option("--giant-items-csv", default="giant_output/items.csv", show_default=True) @click.option("--giant-items-csv", default="data/giant-web/collected_items.csv", show_default=True)
@click.option("--giant-enriched-csv", default="giant_output/items_enriched.csv", show_default=True) @click.option("--giant-enriched-csv", default="data/giant-web/normalized_items.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True) @click.option("--costco-orders-csv", default="data/costco-web/collected_orders.csv", show_default=True)
@click.option("--costco-items-csv", default="costco_output/items.csv", show_default=True) @click.option("--costco-items-csv", default="data/costco-web/collected_items.csv", show_default=True)
@click.option("--costco-enriched-csv", default="costco_output/items_enriched.csv", show_default=True) @click.option("--costco-enriched-csv", default="data/costco-web/normalized_items.csv", show_default=True)
@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True)
@click.option("--summary-csv", default="combined_output/pipeline_status.csv", show_default=True) @click.option("--summary-csv", default="data/review/pipeline_status.csv", show_default=True)
@click.option("--summary-json", default="combined_output/pipeline_status.json", show_default=True) @click.option("--summary-json", default="data/review/pipeline_status.json", show_default=True)
def main( def main(
giant_orders_csv, giant_orders_csv,
giant_items_csv, giant_items_csv,
@@ -105,7 +106,7 @@ def main(
read_rows_if_exists(costco_items_csv), read_rows_if_exists(costco_items_csv),
read_rows_if_exists(costco_enriched_csv), read_rows_if_exists(costco_enriched_csv),
read_rows_if_exists(purchases_csv), read_rows_if_exists(purchases_csv),
read_rows_if_exists(resolutions_csv), [build_purchases.normalize_resolution_row(row) for row in read_rows_if_exists(resolutions_csv)],
) )
write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS) write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS)
summary_json_path = Path(summary_json) summary_json_path = Path(summary_json)

View File

@@ -10,8 +10,8 @@ from layer_helpers import compact_join, stable_id, write_csv_rows
QUEUE_FIELDS = [ QUEUE_FIELDS = [
"review_id", "review_id",
"retailer", "retailer",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"reason_code", "reason_code",
"priority", "priority",
"raw_item_names", "raw_item_names",
@@ -26,36 +26,49 @@ QUEUE_FIELDS = [
"updated_at", "updated_at",
] ]
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
def build_review_queue(purchase_rows, resolution_rows): def build_review_queue(purchase_rows, resolution_rows):
by_observed = defaultdict(list) by_normalized = defaultdict(list)
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
for row in purchase_rows: for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "") normalized_item_id = row.get("normalized_item_id", "")
if not observed_product_id: if not normalized_item_id:
continue continue
by_observed[observed_product_id].append(row) by_normalized[normalized_item_id].append(row)
today_text = str(date.today()) today_text = str(date.today())
queue_rows = [] queue_rows = []
for observed_product_id, rows in sorted(by_observed.items()): for normalized_item_id, rows in sorted(by_normalized.items()):
current_resolution = resolution_lookup.get(observed_product_id, {}) current_resolution = resolution_lookup.get(normalized_item_id, {})
if current_resolution.get("status") == "approved": if current_resolution.get("status") == "approved":
continue continue
unresolved_rows = [row for row in rows if not row.get("canonical_product_id")]
unresolved_rows = [
row
for row in rows
if not row.get("catalog_id")
and row.get("is_item", "true") != "false"
and row.get("is_fee") != "true"
and row.get("is_discount_line") != "true"
and row.get("is_coupon_line") != "true"
]
if not unresolved_rows: if not unresolved_rows:
continue continue
retailers = sorted({row["retailer"] for row in rows}) retailers = sorted({row["retailer"] for row in rows})
review_id = stable_id("rvw", observed_product_id) review_id = stable_id("rvw", normalized_item_id)
queue_rows.append( queue_rows.append(
{ {
"review_id": review_id, "review_id": review_id,
"retailer": " | ".join(retailers), "retailer": " | ".join(retailers),
"observed_product_id": observed_product_id, "normalized_item_id": normalized_item_id,
"canonical_product_id": current_resolution.get("canonical_product_id", ""), "catalog_id": current_resolution.get("catalog_id", ""),
"reason_code": "missing_canonical_link", "reason_code": "missing_catalog_link",
"priority": "high", "priority": "high",
"raw_item_names": compact_join( "raw_item_names": compact_join(
sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}), sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}),
@@ -98,11 +111,6 @@ def save_catalog_rows(path, rows):
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS) write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
def sort_related_items(rows): def sort_related_items(rows):
return sorted( return sorted(
rows, rows,
@@ -115,7 +123,7 @@ def sort_related_items(rows):
) )
def build_canonical_suggestions(related_rows, catalog_rows, limit=3): def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3):
normalized_names = { normalized_names = {
row.get("normalized_item_name", "").strip().upper() row.get("normalized_item_name", "").strip().upper()
for row in related_rows for row in related_rows
@@ -126,56 +134,52 @@ def build_canonical_suggestions(related_rows, catalog_rows, limit=3):
for row in related_rows for row in related_rows
if row.get("upc", "").strip() if row.get("upc", "").strip()
} }
catalog_by_id = {
row.get("catalog_id", ""): row for row in catalog_rows if row.get("catalog_id", "")
}
suggestions = [] suggestions = []
seen_ids = set() seen_ids = set()
def add_matches(rows, reason): def add_catalog_id(catalog_id, reason):
for row in rows: if not catalog_id or catalog_id in seen_ids or catalog_id not in catalog_by_id:
canonical_product_id = row.get("canonical_product_id", "") return False
if not canonical_product_id or canonical_product_id in seen_ids: seen_ids.add(catalog_id)
continue catalog_row = catalog_by_id[catalog_id]
seen_ids.add(canonical_product_id)
suggestions.append( suggestions.append(
{ {
"canonical_product_id": canonical_product_id, "catalog_id": catalog_id,
"canonical_name": row.get("canonical_name", ""), "catalog_name": catalog_row.get("catalog_name", ""),
"reason": reason, "reason": reason,
} }
) )
if len(suggestions) >= limit: return len(suggestions) >= limit
return True
return False
exact_upc_rows = [ reviewed_purchase_rows = [
row row for row in purchase_rows if row.get("catalog_id") and row.get("normalized_item_id")
for row in catalog_rows
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs
] ]
if add_matches(exact_upc_rows, "exact upc"): for row in reviewed_purchase_rows:
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs:
if add_catalog_id(row.get("catalog_id", ""), "exact upc"):
return suggestions return suggestions
exact_name_rows = [ for row in reviewed_purchase_rows:
row if row.get("normalized_item_name", "").strip().upper() in normalized_names:
for row in catalog_rows if add_catalog_id(row.get("catalog_id", ""), "exact normalized name"):
if row.get("canonical_name", "").strip().upper() in normalized_names
]
if add_matches(exact_name_rows, "exact normalized name"):
return suggestions return suggestions
contains_rows = [] for catalog_row in catalog_rows:
for row in catalog_rows: catalog_name = catalog_row.get("catalog_name", "").strip().upper()
canonical_name = row.get("canonical_name", "").strip().upper() if not catalog_name:
if not canonical_name:
continue continue
for normalized_name in normalized_names: for normalized_name in normalized_names:
if normalized_name in canonical_name or canonical_name in normalized_name: if normalized_name in catalog_name or catalog_name in normalized_name:
contains_rows.append(row) if add_catalog_id(catalog_row.get("catalog_id", ""), "catalog name contains match"):
return suggestions
break break
add_matches(contains_rows, "canonical name contains match")
return suggestions return suggestions
def build_display_lines(queue_row, related_rows): def build_display_lines(related_rows):
lines = [] lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1): for index, row in enumerate(sort_related_items(related_rows), start=1):
lines.append( lines.append(
@@ -197,41 +201,38 @@ def build_display_lines(queue_row, related_rows):
return lines return lines
def observed_name(queue_row, related_rows): def normalized_label(queue_row, related_rows):
if queue_row.get("normalized_names"): if queue_row.get("normalized_names"):
return queue_row["normalized_names"].split(" | ")[0] return queue_row["normalized_names"].split(" | ")[0]
for row in related_rows: for row in related_rows:
if row.get("normalized_item_name"): if row.get("normalized_item_name"):
return row["normalized_item_name"] return row["normalized_item_name"]
return queue_row.get("observed_product_id", "") return queue_row.get("normalized_item_id", "")
def choose_existing_canonical(display_rows, observed_label, matched_count): def choose_existing_catalog(display_rows, normalized_name, matched_count):
click.secho( click.secho(
f"Select the canonical_name to associate {matched_count} items with:", f"Select the catalog_name to associate {matched_count} items with:",
fg=INFO_COLOR, fg=INFO_COLOR,
) )
for index, row in enumerate(display_rows, start=1): for index, row in enumerate(display_rows, start=1):
click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}") click.echo(f" [{index}] {row['catalog_name']} | {row['catalog_id']}")
choice = click.prompt( choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR), click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)), type=click.IntRange(1, len(display_rows)),
) )
chosen_row = display_rows[choice - 1] chosen_row = display_rows[choice - 1]
click.echo( click.echo(
f'{matched_count} "{observed_label}" items and future matches will be associated ' f'{matched_count} "{normalized_name}" items and future matches will be associated '
f'with "{chosen_row["canonical_name"]}".' f'with "{chosen_row["catalog_name"]}".'
)
click.secho(
"actions: [y]es [n]o [b]ack [s]kip [q]uit",
fg=PROMPT_COLOR,
) )
click.secho("actions: [y]es [n]o [b]ack [s]kip [q]uit", fg=PROMPT_COLOR)
confirm = click.prompt( confirm = click.prompt(
click.style("confirm", fg=PROMPT_COLOR), click.style("confirm", fg=PROMPT_COLOR),
type=click.Choice(["y", "n", "b", "s", "q"]), type=click.Choice(["y", "n", "b", "s", "q"]),
) )
if confirm == "y": if confirm == "y":
return chosen_row["canonical_product_id"], "" return chosen_row["catalog_id"], ""
if confirm == "s": if confirm == "s":
return "", "skip" return "", "skip"
if confirm == "q": if confirm == "q":
@@ -239,54 +240,43 @@ def choose_existing_canonical(display_rows, observed_label, matched_count):
return "", "back" return "", "back"
def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total): def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total):
suggestions = build_canonical_suggestions(related_rows, catalog_rows) suggestions = build_catalog_suggestions(related_rows, purchase_rows, catalog_rows)
observed_label = observed_name(queue_row, related_rows) normalized_name = normalized_label(queue_row, related_rows)
matched_count = len(related_rows) matched_count = len(related_rows)
click.echo("") click.echo("")
click.secho( click.secho(
f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} " f"Review {queue_index}/{queue_total}: Resolve normalized_item {normalized_name} "
"to canonical_name [__]?", "to catalog_name [__]?",
fg=INFO_COLOR, fg=INFO_COLOR,
) )
click.echo(f"{matched_count} matched items:") click.echo(f"{matched_count} matched items:")
for line in build_display_lines(queue_row, related_rows): for line in build_display_lines(related_rows):
click.echo(line) click.echo(line)
if suggestions: if suggestions:
click.echo(f"{len(suggestions)} canonical suggestions found:") click.echo(f"{len(suggestions)} catalog_name suggestions found:")
for index, suggestion in enumerate(suggestions, start=1): for index, suggestion in enumerate(suggestions, start=1):
click.echo(f" [{index}] {suggestion['canonical_name']}") click.echo(f" [{index}] {suggestion['catalog_name']}")
else: else:
click.echo("no canonical_name suggestions found") click.echo("no catalog_name suggestions found")
click.secho( click.secho("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", fg=PROMPT_COLOR)
"[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", action = click.prompt("", type=click.Choice(["l", "n", "x", "s", "q"]), prompt_suffix=" ")
fg=PROMPT_COLOR,
)
action = click.prompt(
"",
type=click.Choice(["l", "n", "x", "s", "q"]),
prompt_suffix=" ",
)
if action == "q": if action == "q":
return None, None return None, None
if action == "s": if action == "s":
return { return {
"observed_product_id": queue_row["observed_product_id"], "normalized_item_id": queue_row["normalized_item_id"],
"canonical_product_id": "", "catalog_id": "",
"resolution_action": "skip", "resolution_action": "skip",
"status": "pending", "status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""), "resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
}, None }, None
if action == "x": if action == "x":
notes = click.prompt( notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False)
click.style("exclude notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
return { return {
"observed_product_id": queue_row["observed_product_id"], "normalized_item_id": queue_row["normalized_item_id"],
"canonical_product_id": "", "catalog_id": "",
"resolution_action": "exclude", "resolution_action": "exclude",
"status": "approved", "status": "approved",
"resolution_notes": notes, "resolution_notes": notes,
@@ -295,22 +285,19 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_
if action == "l": if action == "l":
display_rows = suggestions or [ display_rows = suggestions or [
{ {
"canonical_product_id": row["canonical_product_id"], "catalog_id": row["catalog_id"],
"canonical_name": row["canonical_name"], "catalog_name": row["catalog_name"],
"reason": "catalog sample", "reason": "catalog sample",
} }
for row in catalog_rows[:10] for row in catalog_rows[:10]
if row.get("catalog_id")
] ]
while True: while True:
canonical_product_id, outcome = choose_existing_canonical( catalog_id, outcome = choose_existing_catalog(display_rows, normalized_name, matched_count)
display_rows,
observed_label,
matched_count,
)
if outcome == "skip": if outcome == "skip":
return { return {
"observed_product_id": queue_row["observed_product_id"], "normalized_item_id": queue_row["normalized_item_id"],
"canonical_product_id": "", "catalog_id": "",
"resolution_action": "skip", "resolution_action": "skip",
"status": "pending", "status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""), "resolution_notes": queue_row.get("resolution_notes", ""),
@@ -323,34 +310,22 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_
break break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return { return {
"observed_product_id": queue_row["observed_product_id"], "normalized_item_id": queue_row["normalized_item_id"],
"canonical_product_id": canonical_product_id, "catalog_id": catalog_id,
"resolution_action": "link", "resolution_action": "link",
"status": "approved", "status": "approved",
"resolution_notes": notes, "resolution_notes": notes,
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
}, None }, None
canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str) catalog_name = click.prompt(click.style("catalog name", fg=PROMPT_COLOR), type=str)
category = click.prompt( category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False)
click.style("category", fg=PROMPT_COLOR), product_type = click.prompt(click.style("product type", fg=PROMPT_COLOR), default="", show_default=False)
default="", notes = click.prompt(click.style("notes", fg=PROMPT_COLOR), default="", show_default=False)
show_default=False, catalog_id = stable_id("cat", f"manual|{catalog_name}|{category}|{product_type}")
) catalog_row = {
product_type = click.prompt( "catalog_id": catalog_id,
click.style("product type", fg=PROMPT_COLOR), "catalog_name": catalog_name,
default="",
show_default=False,
)
notes = click.prompt(
click.style("notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}")
canonical_row = {
"canonical_product_id": canonical_product_id,
"canonical_name": canonical_name,
"category": category, "category": category,
"product_type": product_type, "product_type": product_type,
"brand": "", "brand": "",
@@ -364,27 +339,27 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_
"updated_at": str(date.today()), "updated_at": str(date.today()),
} }
resolution_row = { resolution_row = {
"observed_product_id": queue_row["observed_product_id"], "normalized_item_id": queue_row["normalized_item_id"],
"canonical_product_id": canonical_product_id, "catalog_id": catalog_id,
"resolution_action": "create", "resolution_action": "create",
"status": "approved", "status": "approved",
"resolution_notes": notes, "resolution_notes": notes,
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
} }
return resolution_row, canonical_row return resolution_row, catalog_row
@click.command() @click.command()
@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--queue-csv", default="combined_output/review_queue.csv", show_default=True) @click.option("--queue-csv", default="data/review/review_queue.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True) @click.option("--catalog-csv", default="data/catalog.csv", show_default=True)
@click.option("--limit", default=0, show_default=True, type=int) @click.option("--limit", default=0, show_default=True, type=int)
@click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.") @click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.")
def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only): def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only):
purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv) purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv)
resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv) resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv)
catalog_rows = build_purchases.read_optional_csv_rows(catalog_csv) catalog_rows = build_purchases.merge_catalog_rows(build_purchases.read_optional_csv_rows(catalog_csv), [])
queue_rows = build_review_queue(purchase_rows, resolution_rows) queue_rows = build_review_queue(purchase_rows, resolution_rows)
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS) write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}") click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}")
@@ -393,29 +368,33 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_
return return
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")} catalog_by_id = {row["catalog_id"]: row for row in catalog_rows if row.get("catalog_id")}
rows_by_observed = defaultdict(list) rows_by_normalized = defaultdict(list)
for row in purchase_rows: for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "") normalized_item_id = row.get("normalized_item_id", "")
if observed_product_id: if normalized_item_id:
rows_by_observed[observed_product_id].append(row) rows_by_normalized[normalized_item_id].append(row)
reviewed = 0 reviewed = 0
for index, queue_row in enumerate(queue_rows, start=1): for index, queue_row in enumerate(queue_rows, start=1):
if limit and reviewed >= limit: if limit and reviewed >= limit:
break break
related_rows = rows_by_observed.get(queue_row["observed_product_id"], []) related_rows = rows_by_normalized.get(queue_row["normalized_item_id"], [])
result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows)) result = prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, index, len(queue_rows))
if result == (None, None): if result == (None, None):
break break
resolution_row, canonical_row = result resolution_row, catalog_row = result
resolution_lookup[resolution_row["observed_product_id"]] = resolution_row resolution_lookup[resolution_row["normalized_item_id"]] = resolution_row
if canonical_row and canonical_row["canonical_product_id"] not in catalog_by_id: if catalog_row and catalog_row["catalog_id"] not in catalog_by_id:
catalog_by_id[canonical_row["canonical_product_id"]] = canonical_row catalog_by_id[catalog_row["catalog_id"]] = catalog_row
catalog_rows.append(canonical_row) catalog_rows.append(catalog_row)
reviewed += 1 reviewed += 1
save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["observed_product_id"])) save_resolution_rows(
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["canonical_product_id"])) resolutions_csv,
sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]),
)
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"]))
click.echo( click.echo(
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} " f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} "
f"and {len(catalog_by_id)} catalog rows to {catalog_csv}" f"and {len(catalog_by_id)} catalog rows to {catalog_csv}"

View File

@@ -285,6 +285,47 @@ class CostcoPipelineTests(unittest.TestCase):
self.assertEqual("true", discount["is_coupon_line"]) self.assertEqual("true", discount["is_coupon_line"])
self.assertEqual("false", discount["is_item"]) self.assertEqual("false", discount["is_item"])
def test_costco_name_cleanup_removes_dual_weight_and_logistics_artifacts(self):
mixed_units = enrich_costco.parse_costco_item(
order_id="abc",
order_date="2026-03-12",
raw_path=Path("costco_output/raw/abc.json"),
line_no=1,
item={
"itemNumber": "18600",
"itemDescription01": "MANDARINS 2.27 KG / 5 LBS",
"itemDescription02": None,
"itemDepartmentNumber": 65,
"transDepartmentNumber": 65,
"unit": 1,
"itemIdentifier": "E",
"amount": 7.49,
"itemUnitPriceAmount": 7.49,
},
)
self.assertEqual("MANDARIN", mixed_units["item_name_norm"])
self.assertEqual("5", mixed_units["size_value"])
self.assertEqual("lb", mixed_units["size_unit"])
logistics = enrich_costco.parse_costco_item(
order_id="abc",
order_date="2026-03-12",
raw_path=Path("costco_output/raw/abc.json"),
line_no=2,
item={
"itemNumber": "1375005",
"itemDescription01": "LIFE 6'TABLE MDL #80873U - T12/H3/P36",
"itemDescription02": None,
"itemDepartmentNumber": 18,
"transDepartmentNumber": 18,
"unit": 1,
"itemIdentifier": "E",
"amount": 119.98,
"itemUnitPriceAmount": 119.98,
},
)
self.assertEqual("LIFE 6'TABLE MDL", logistics["item_name_norm"])
def test_build_items_enriched_matches_discount_to_item(self): def test_build_items_enriched_matches_discount_to_item(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
raw_dir = Path(tmpdir) / "raw" raw_dir = Path(tmpdir) / "raw"

View File

@@ -13,6 +13,7 @@ class PipelineStatusTests(unittest.TestCase):
"retailer": "giant", "retailer": "giant",
"order_id": "g1", "order_id": "g1",
"line_no": "1", "line_no": "1",
"normalized_item_id": "gnorm_banana",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
"item_name": "FRESH BANANA", "item_name": "FRESH BANANA",
"retailer_item_id": "1", "retailer_item_id": "1",
@@ -37,8 +38,8 @@ class PipelineStatusTests(unittest.TestCase):
costco_enriched=[], costco_enriched=[],
purchases=[ purchases=[
{ {
"observed_product_id": "gobs_banana", "normalized_item_id": "gnorm_banana",
"canonical_product_id": "gcan_banana", "catalog_id": "cat_banana",
"resolution_action": "", "resolution_action": "",
"is_fee": "false", "is_fee": "false",
"is_discount_line": "false", "is_discount_line": "false",
@@ -50,8 +51,8 @@ class PipelineStatusTests(unittest.TestCase):
"line_total": "1.29", "line_total": "1.29",
}, },
{ {
"observed_product_id": "gobs_lime", "normalized_item_id": "cnorm_lime",
"canonical_product_id": "", "catalog_id": "",
"resolution_action": "", "resolution_action": "",
"is_fee": "false", "is_fee": "false",
"is_discount_line": "false", "is_discount_line": "false",
@@ -69,10 +70,10 @@ class PipelineStatusTests(unittest.TestCase):
counts = {row["stage"]: row["count"] for row in summary} counts = {row["stage"]: row["count"] for row in summary}
self.assertEqual(1, counts["raw_orders"]) self.assertEqual(1, counts["raw_orders"])
self.assertEqual(1, counts["raw_items"]) self.assertEqual(1, counts["raw_items"])
self.assertEqual(1, counts["enriched_items"]) self.assertEqual(1, counts["normalized_items"])
self.assertEqual(1, counts["canonical_linked_purchase_rows"]) self.assertEqual(1, counts["linked_purchase_rows"])
self.assertEqual(1, counts["unresolved_purchase_rows"]) self.assertEqual(1, counts["unresolved_purchase_rows"])
self.assertEqual(1, counts["review_queue_observed_products"]) self.assertEqual(1, counts["review_queue_normalized_items"])
self.assertEqual(0, counts["unresolved_not_in_review_rows"]) self.assertEqual(0, counts["unresolved_not_in_review_rows"])

View File

@@ -29,7 +29,7 @@ class PurchaseLogTests(unittest.TestCase):
self.assertEqual("0.125", metrics["price_per_oz"]) self.assertEqual("0.125", metrics["price_per_oz"])
self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"]) self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"])
def test_build_purchase_rows_maps_canonical_ids(self): def test_build_purchase_rows_maps_catalog_ids(self):
fieldnames = enrich_costco.OUTPUT_FIELDS fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames} giant_row = {field: "" for field in fieldnames}
giant_row.update( giant_row.update(
@@ -37,7 +37,8 @@ class PurchaseLogTests(unittest.TestCase):
"retailer": "giant", "retailer": "giant",
"order_id": "g1", "order_id": "g1",
"line_no": "1", "line_no": "1",
"observed_item_key": "giant:g1:1", "normalized_row_id": "giant:g1:1",
"normalized_item_id": "gnorm:banana",
"order_date": "2026-03-01", "order_date": "2026-03-01",
"item_name": "FRESH BANANA", "item_name": "FRESH BANANA",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
@@ -50,7 +51,7 @@ class PurchaseLogTests(unittest.TestCase):
"unit_price": "1.29", "unit_price": "1.29",
"measure_type": "weight", "measure_type": "weight",
"price_per_lb": "1.29", "price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json", "raw_order_path": "data/giant-web/raw/g1.json",
"is_discount_line": "false", "is_discount_line": "false",
"is_coupon_line": "false", "is_coupon_line": "false",
"is_fee": "false", "is_fee": "false",
@@ -62,7 +63,8 @@ class PurchaseLogTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c1", "order_id": "c1",
"line_no": "1", "line_no": "1",
"observed_item_key": "costco:c1:1", "normalized_row_id": "costco:c1:1",
"normalized_item_id": "cnorm:banana",
"order_date": "2026-03-12", "order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG", "item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
@@ -75,7 +77,7 @@ class PurchaseLogTests(unittest.TestCase):
"size_unit": "lb", "size_unit": "lb",
"measure_type": "weight", "measure_type": "weight",
"price_per_lb": "0.9933", "price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json", "raw_order_path": "data/costco-web/raw/c1.json",
"is_discount_line": "false", "is_discount_line": "false",
"is_coupon_line": "false", "is_coupon_line": "false",
"is_fee": "false", "is_fee": "false",
@@ -99,17 +101,58 @@ class PurchaseLogTests(unittest.TestCase):
"store_state": "VA", "store_state": "VA",
} }
] ]
catalog_rows = [
{
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
]
link_rows = [
{
"normalized_item_id": "gnorm:banana",
"catalog_id": "cat_banana",
"link_method": "manual_link",
"link_confidence": "high",
"review_status": "approved",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
},
{
"normalized_item_id": "cnorm:banana",
"catalog_id": "cat_banana",
"link_method": "manual_link",
"link_confidence": "high",
"review_status": "approved",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
},
]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows( rows, _links = build_purchases.build_purchase_rows(
[giant_row], [giant_row],
[costco_row], [costco_row],
giant_orders, giant_orders,
costco_orders, costco_orders,
[], [],
link_rows,
catalog_rows,
) )
self.assertEqual(2, len(rows)) self.assertEqual(2, len(rows))
self.assertTrue(all(row["canonical_product_id"] for row in rows)) self.assertTrue(all(row["catalog_id"] == "cat_banana" for row in rows))
self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows}) self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows})
self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"]) self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"])
@@ -120,10 +163,10 @@ class PurchaseLogTests(unittest.TestCase):
giant_orders = Path(tmpdir) / "giant_orders.csv" giant_orders = Path(tmpdir) / "giant_orders.csv"
costco_orders = Path(tmpdir) / "costco_orders.csv" costco_orders = Path(tmpdir) / "costco_orders.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv" catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv" links_csv = Path(tmpdir) / "product_links.csv"
purchases_csv = Path(tmpdir) / "combined" / "purchases.csv" purchases_csv = Path(tmpdir) / "review" / "purchases.csv"
examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv" examples_csv = Path(tmpdir) / "review" / "comparison_examples.csv"
fieldnames = enrich_costco.OUTPUT_FIELDS fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames} giant_row = {field: "" for field in fieldnames}
@@ -132,7 +175,8 @@ class PurchaseLogTests(unittest.TestCase):
"retailer": "giant", "retailer": "giant",
"order_id": "g1", "order_id": "g1",
"line_no": "1", "line_no": "1",
"observed_item_key": "giant:g1:1", "normalized_row_id": "giant:g1:1",
"normalized_item_id": "gnorm:banana",
"order_date": "2026-03-01", "order_date": "2026-03-01",
"item_name": "FRESH BANANA", "item_name": "FRESH BANANA",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
@@ -144,7 +188,7 @@ class PurchaseLogTests(unittest.TestCase):
"unit_price": "1.29", "unit_price": "1.29",
"measure_type": "weight", "measure_type": "weight",
"price_per_lb": "1.29", "price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json", "raw_order_path": "data/giant-web/raw/g1.json",
"is_discount_line": "false", "is_discount_line": "false",
"is_coupon_line": "false", "is_coupon_line": "false",
"is_fee": "false", "is_fee": "false",
@@ -156,7 +200,8 @@ class PurchaseLogTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c1", "order_id": "c1",
"line_no": "1", "line_no": "1",
"observed_item_key": "costco:c1:1", "normalized_row_id": "costco:c1:1",
"normalized_item_id": "cnorm:banana",
"order_date": "2026-03-12", "order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG", "item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
@@ -169,17 +214,14 @@ class PurchaseLogTests(unittest.TestCase):
"size_unit": "lb", "size_unit": "lb",
"measure_type": "weight", "measure_type": "weight",
"price_per_lb": "0.9933", "price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json", "raw_order_path": "data/costco-web/raw/c1.json",
"is_discount_line": "false", "is_discount_line": "false",
"is_coupon_line": "false", "is_coupon_line": "false",
"is_fee": "false", "is_fee": "false",
} }
) )
for path, source_rows in [ for path, source_rows in [(giant_items, [giant_row]), (costco_items, [costco_row])]:
(giant_items, [giant_row]),
(costco_items, [costco_row]),
]:
with path.open("w", newline="", encoding="utf-8") as handle: with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames) writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader() writer.writeheader()
@@ -217,6 +259,55 @@ class PurchaseLogTests(unittest.TestCase):
writer.writeheader() writer.writeheader()
writer.writerows(source_rows) writer.writerows(source_rows)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
with links_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=build_purchases.PRODUCT_LINK_FIELDS)
writer.writeheader()
writer.writerows(
[
{
"normalized_item_id": "gnorm:banana",
"catalog_id": "cat_banana",
"link_method": "manual_link",
"link_confidence": "high",
"review_status": "approved",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
},
{
"normalized_item_id": "cnorm:banana",
"catalog_id": "cat_banana",
"link_method": "manual_link",
"link_confidence": "high",
"review_status": "approved",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
},
]
)
build_purchases.main.callback( build_purchases.main.callback(
giant_items_enriched_csv=str(giant_items), giant_items_enriched_csv=str(giant_items),
costco_items_enriched_csv=str(costco_items), costco_items_enriched_csv=str(costco_items),
@@ -246,7 +337,8 @@ class PurchaseLogTests(unittest.TestCase):
"retailer": "giant", "retailer": "giant",
"order_id": "g1", "order_id": "g1",
"line_no": "1", "line_no": "1",
"observed_item_key": "giant:g1:1", "normalized_row_id": "giant:g1:1",
"normalized_item_id": "gnorm:ice",
"order_date": "2026-03-01", "order_date": "2026-03-01",
"item_name": "SB BAGGED ICE 20LB", "item_name": "SB BAGGED ICE 20LB",
"item_name_norm": "BAGGED ICE", "item_name_norm": "BAGGED ICE",
@@ -257,17 +349,14 @@ class PurchaseLogTests(unittest.TestCase):
"line_total": "3.50", "line_total": "3.50",
"unit_price": "3.50", "unit_price": "3.50",
"measure_type": "each", "measure_type": "each",
"raw_order_path": "giant_output/raw/g1.json", "raw_order_path": "data/giant-web/raw/g1.json",
"is_discount_line": "false", "is_discount_line": "false",
"is_coupon_line": "false", "is_coupon_line": "false",
"is_fee": "false", "is_fee": "false",
} }
) )
observed_rows, _canonical_rows, _link_rows, _observed_id_by_key, _canonical_by_observed = (
build_purchases.build_link_state([giant_row]) rows, links = build_purchases.build_purchase_rows(
)
observed_product_id = observed_rows[0]["observed_product_id"]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row], [giant_row],
[], [],
[ [
@@ -282,19 +371,38 @@ class PurchaseLogTests(unittest.TestCase):
[], [],
[ [
{ {
"observed_product_id": observed_product_id, "normalized_item_id": "gnorm:ice",
"canonical_product_id": "gcan_manual_ice", "catalog_id": "cat_ice",
"resolution_action": "create", "resolution_action": "create",
"status": "approved", "status": "approved",
"resolution_notes": "manual ice merge", "resolution_notes": "manual ice merge",
"reviewed_at": "2026-03-16", "reviewed_at": "2026-03-16",
} }
], ],
[],
[
{
"catalog_id": "cat_ice",
"catalog_name": "ICE",
"category": "frozen",
"product_type": "ice",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
],
) )
self.assertEqual("gcan_manual_ice", rows[0]["canonical_product_id"]) self.assertEqual("cat_ice", rows[0]["catalog_id"])
self.assertEqual("approved", rows[0]["review_status"]) self.assertEqual("approved", rows[0]["review_status"])
self.assertEqual("create", rows[0]["resolution_action"]) self.assertEqual("create", rows[0]["resolution_action"])
self.assertEqual("cat_ice", links[0]["catalog_id"])
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -14,33 +14,39 @@ class ReviewWorkflowTests(unittest.TestCase):
queue_rows = review_products.build_review_queue( queue_rows = review_products.build_review_queue(
[ [
{ {
"observed_product_id": "gobs_1", "normalized_item_id": "gnorm_1",
"canonical_product_id": "", "catalog_id": "",
"retailer": "giant", "retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB", "raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE", "normalized_item_name": "BAGGED ICE",
"upc": "", "upc": "",
"line_total": "3.50", "line_total": "3.50",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
}, },
{ {
"observed_product_id": "gobs_1", "normalized_item_id": "gnorm_1",
"canonical_product_id": "", "catalog_id": "",
"retailer": "giant", "retailer": "giant",
"raw_item_name": "SB BAG ICE CUBED 10LB", "raw_item_name": "SB BAG ICE CUBED 10LB",
"normalized_item_name": "BAG ICE", "normalized_item_name": "BAG ICE",
"upc": "", "upc": "",
"line_total": "2.50", "line_total": "2.50",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
}, },
], ],
[], [],
) )
self.assertEqual(1, len(queue_rows)) self.assertEqual(1, len(queue_rows))
self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"]) self.assertEqual("gnorm_1", queue_rows[0]["normalized_item_id"])
self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"]) self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"])
def test_build_canonical_suggestions_prefers_upc_then_name(self): def test_build_catalog_suggestions_prefers_upc_then_name(self):
suggestions = review_products.build_canonical_suggestions( suggestions = review_products.build_catalog_suggestions(
[ [
{ {
"normalized_item_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER",
@@ -49,36 +55,41 @@ class ReviewWorkflowTests(unittest.TestCase):
], ],
[ [
{ {
"canonical_product_id": "gcan_1", "normalized_item_id": "prior_1",
"canonical_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER 6 PACK",
"upc": "", "upc": "12345",
"catalog_id": "cat_2",
}
],
[
{
"catalog_id": "cat_1",
"catalog_name": "MIXED PEPPER",
}, },
{ {
"canonical_product_id": "gcan_2", "catalog_id": "cat_2",
"canonical_name": "MIXED PEPPER 6 PACK", "catalog_name": "MIXED PEPPER 6 PACK",
"upc": "12345",
}, },
], ],
) )
self.assertEqual("gcan_2", suggestions[0]["canonical_product_id"]) self.assertEqual("cat_2", suggestions[0]["catalog_id"])
self.assertEqual("exact upc", suggestions[0]["reason"]) self.assertEqual("exact upc", suggestions[0]["reason"])
self.assertEqual("gcan_1", suggestions[1]["canonical_product_id"])
def test_review_products_displays_position_items_and_suggestions(self): def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv" purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv" queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv" catalog_csv = Path(tmpdir) / "catalog.csv"
purchase_fields = [ purchase_fields = [
"purchase_date", "purchase_date",
"retailer", "retailer",
"order_id", "order_id",
"line_no", "line_no",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"image_url", "image_url",
@@ -95,8 +106,8 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c2", "order_id": "c2",
"line_no": "2", "line_no": "2",
"observed_product_id": "gobs_mix", "normalized_item_id": "cnorm_mix",
"canonical_product_id": "", "catalog_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK", "raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER",
"image_url": "", "image_url": "",
@@ -108,14 +119,27 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c1", "order_id": "c1",
"line_no": "1", "line_no": "1",
"observed_product_id": "gobs_mix", "normalized_item_id": "cnorm_mix",
"canonical_product_id": "", "catalog_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK", "raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER",
"image_url": "https://example.test/mixed-pepper.jpg", "image_url": "https://example.test/mixed-pepper.jpg",
"upc": "", "upc": "",
"line_total": "6.99", "line_total": "6.99",
}, },
{
"purchase_date": "2026-03-10",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_item_id": "gnorm_mix",
"catalog_id": "cat_mix",
"raw_item_name": "MIXED PEPPER",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "5.99",
},
] ]
) )
@@ -124,8 +148,8 @@ class ReviewWorkflowTests(unittest.TestCase):
writer.writeheader() writer.writeheader()
writer.writerow( writer.writerow(
{ {
"canonical_product_id": "gcan_mix", "catalog_id": "cat_mix",
"canonical_name": "MIXED PEPPER", "catalog_name": "MIXED PEPPER",
"category": "produce", "category": "produce",
"product_type": "pepper", "product_type": "pepper",
"brand": "", "brand": "",
@@ -158,14 +182,14 @@ class ReviewWorkflowTests(unittest.TestCase):
) )
self.assertEqual(0, result.exit_code) self.assertEqual(0, result.exit_code)
self.assertIn("Review 1/1: Resolve observed_product MIXED PEPPER to canonical_name [__]?", result.output) self.assertIn("Review 1/1: Resolve normalized_item MIXED PEPPER to catalog_name [__]?", result.output)
self.assertIn("2 matched items:", result.output) self.assertIn("2 matched items:", result.output)
self.assertIn("[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", result.output) self.assertIn("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", result.output)
first_item = result.output.index("[1] 2026-03-14 | 7.49") first_item = result.output.index("[1] 2026-03-14 | 7.49")
second_item = result.output.index("[2] 2026-03-12 | 6.99") second_item = result.output.index("[2] 2026-03-12 | 6.99")
self.assertLess(first_item, second_item) self.assertLess(first_item, second_item)
self.assertIn("https://example.test/mixed-pepper.jpg", result.output) self.assertIn("https://example.test/mixed-pepper.jpg", result.output)
self.assertIn("1 canonical suggestions found:", result.output) self.assertIn("1 catalog_name suggestions found:", result.output)
self.assertIn("[1] MIXED PEPPER", result.output) self.assertIn("[1] MIXED PEPPER", result.output)
self.assertIn("\x1b[", result.output) self.assertIn("\x1b[", result.output)
@@ -174,7 +198,7 @@ class ReviewWorkflowTests(unittest.TestCase):
purchases_csv = Path(tmpdir) / "purchases.csv" purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv" queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv" catalog_csv = Path(tmpdir) / "catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle: with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter( writer = csv.DictWriter(
@@ -184,8 +208,8 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer", "retailer",
"order_id", "order_id",
"line_no", "line_no",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"image_url", "image_url",
@@ -200,8 +224,8 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer": "giant", "retailer": "giant",
"order_id": "g1", "order_id": "g1",
"line_no": "1", "line_no": "1",
"observed_product_id": "gobs_ice", "normalized_item_id": "gnorm_ice",
"canonical_product_id": "", "catalog_id": "",
"raw_item_name": "SB BAGGED ICE 20LB", "raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE", "normalized_item_name": "BAGGED ICE",
"image_url": "", "image_url": "",
@@ -231,14 +255,14 @@ class ReviewWorkflowTests(unittest.TestCase):
) )
self.assertEqual(0, result.exit_code) self.assertEqual(0, result.exit_code)
self.assertIn("no canonical_name suggestions found", result.output) self.assertIn("no catalog_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self): def test_link_existing_uses_numbered_selection_and_confirmation(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv" purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv" queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv" catalog_csv = Path(tmpdir) / "catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle: with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter( writer = csv.DictWriter(
@@ -248,8 +272,8 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer", "retailer",
"order_id", "order_id",
"line_no", "line_no",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"image_url", "image_url",
@@ -265,8 +289,8 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c2", "order_id": "c2",
"line_no": "2", "line_no": "2",
"observed_product_id": "gobs_mix", "normalized_item_id": "cnorm_mix",
"canonical_product_id": "", "catalog_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK", "raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER",
"image_url": "", "image_url": "",
@@ -278,14 +302,27 @@ class ReviewWorkflowTests(unittest.TestCase):
"retailer": "costco", "retailer": "costco",
"order_id": "c1", "order_id": "c1",
"line_no": "1", "line_no": "1",
"observed_product_id": "gobs_mix", "normalized_item_id": "cnorm_mix",
"canonical_product_id": "", "catalog_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK", "raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER", "normalized_item_name": "MIXED PEPPER",
"image_url": "", "image_url": "",
"upc": "", "upc": "",
"line_total": "6.99", "line_total": "6.99",
}, },
{
"purchase_date": "2026-03-10",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_item_id": "gnorm_mix",
"catalog_id": "cat_mix",
"raw_item_name": "MIXED PEPPER",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "5.99",
},
] ]
) )
@@ -294,8 +331,8 @@ class ReviewWorkflowTests(unittest.TestCase):
writer.writeheader() writer.writeheader()
writer.writerow( writer.writerow(
{ {
"canonical_product_id": "gcan_mix", "catalog_id": "cat_mix",
"canonical_name": "MIXED PEPPER", "catalog_name": "MIXED PEPPER",
"category": "", "category": "",
"product_type": "", "product_type": "",
"brand": "", "brand": "",
@@ -329,29 +366,29 @@ class ReviewWorkflowTests(unittest.TestCase):
) )
self.assertEqual(0, result.exit_code) self.assertEqual(0, result.exit_code)
self.assertIn("Select the canonical_name to associate 2 items with:", result.output) self.assertIn("Select the catalog_name to associate 2 items with:", result.output)
self.assertIn('[1] MIXED PEPPER | gcan_mix', result.output) self.assertIn("[1] MIXED PEPPER | cat_mix", result.output)
self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output) self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output)
self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output) self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle: with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle)) rows = list(csv.DictReader(handle))
self.assertEqual("gcan_mix", rows[0]["canonical_product_id"]) self.assertEqual("cat_mix", rows[0]["catalog_id"])
self.assertEqual("link", rows[0]["resolution_action"]) self.assertEqual("link", rows[0]["resolution_action"])
def test_review_products_creates_canonical_and_resolution(self): def test_review_products_creates_catalog_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv" purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv" queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv" catalog_csv = Path(tmpdir) / "catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle: with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter( writer = csv.DictWriter(
handle, handle,
fieldnames=[ fieldnames=[
"purchase_date", "purchase_date",
"observed_product_id", "normalized_item_id",
"canonical_product_id", "catalog_id",
"retailer", "retailer",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
@@ -366,8 +403,8 @@ class ReviewWorkflowTests(unittest.TestCase):
writer.writerow( writer.writerow(
{ {
"purchase_date": "2026-03-15", "purchase_date": "2026-03-15",
"observed_product_id": "gobs_ice", "normalized_item_id": "gnorm_ice",
"canonical_product_id": "", "catalog_id": "",
"retailer": "giant", "retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB", "raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE", "normalized_item_name": "BAGGED ICE",
@@ -402,7 +439,7 @@ class ReviewWorkflowTests(unittest.TestCase):
catalog_rows = list(csv.DictReader(handle)) catalog_rows = list(csv.DictReader(handle))
self.assertEqual("create", resolution_rows[0]["resolution_action"]) self.assertEqual("create", resolution_rows[0]["resolution_action"])
self.assertEqual("approved", resolution_rows[0]["status"]) self.assertEqual("approved", resolution_rows[0]["status"])
self.assertEqual("ICE", catalog_rows[0]["canonical_name"]) self.assertEqual("ICE", catalog_rows[0]["catalog_name"])
if __name__ == "__main__": if __name__ == "__main__":