From 9104781b9367a8f0c46adc405e11f1b6f96a0329 Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 20 Mar 2026 11:27:46 -0400 Subject: [PATCH] Refactor review pipeline around normalized items --- build_purchases.py | 249 +++++++++++++++++-------------- report_pipeline_status.py | 37 ++--- review_products.py | 267 ++++++++++++++++------------------ tests/test_pipeline_status.py | 15 +- tests/test_purchases.py | 164 +++++++++++++++++---- tests/test_review_workflow.py | 141 +++++++++++------- 6 files changed, 512 insertions(+), 361 deletions(-) diff --git a/build_purchases.py b/build_purchases.py index e7f1a74..5facc9e 100644 --- a/build_purchases.py +++ b/build_purchases.py @@ -3,11 +3,8 @@ from pathlib import Path import click -import build_canonical_layer -import build_observed_products -import validate_cross_retailer_flow from enrich_giant import format_decimal, to_decimal -from layer_helpers import read_csv_rows, stable_id, write_csv_rows +from layer_helpers import read_csv_rows, write_csv_rows PURCHASE_FIELDS = [ @@ -15,13 +12,18 @@ PURCHASE_FIELDS = [ "retailer", "order_id", "line_no", - "observed_item_key", - "observed_product_id", - "canonical_product_id", + "normalized_row_id", + "normalized_item_id", + "catalog_id", "review_status", "resolution_action", "raw_item_name", "normalized_item_name", + "catalog_name", + "category", + "product_type", + "brand", + "variant", "image_url", "retailer_item_id", "upc", @@ -55,7 +57,7 @@ PURCHASE_FIELDS = [ EXAMPLE_FIELDS = [ "example_name", - "canonical_product_id", + "catalog_id", "giant_purchase_date", "giant_raw_item_name", "giant_price_per_lb", @@ -66,8 +68,8 @@ EXAMPLE_FIELDS = [ ] CATALOG_FIELDS = [ - "canonical_product_id", - "canonical_name", + "catalog_id", + "catalog_name", "category", "product_type", "brand", @@ -81,9 +83,20 @@ CATALOG_FIELDS = [ "updated_at", ] +PRODUCT_LINK_FIELDS = [ + "normalized_item_id", + "catalog_id", + "link_method", + "link_confidence", + "review_status", + "reviewed_by", + "reviewed_at", + "link_notes", +] + RESOLUTION_FIELDS = [ - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "resolution_action", "status", "resolution_notes", @@ -91,10 +104,6 @@ RESOLUTION_FIELDS = [ ] -def decimal_or_zero(value): - return to_decimal(value) or Decimal("0") - - def derive_metrics(row): line_total = to_decimal(row.get("net_line_total") or row.get("line_total")) qty = to_decimal(row.get("qty")) @@ -162,10 +171,7 @@ def derive_metrics(row): def order_lookup(rows, retailer): - return { - (retailer, row["order_id"]): row - for row in rows - } + return {(retailer, row["order_id"]): row for row in rows} def read_optional_csv_rows(path): @@ -175,28 +181,10 @@ def read_optional_csv_rows(path): return read_csv_rows(path) -def load_resolution_lookup(resolution_rows): - lookup = {} - for row in resolution_rows: - if not row.get("observed_product_id"): - continue - lookup[row["observed_product_id"]] = row - return lookup - - -def merge_catalog_rows(existing_rows, auto_rows): - merged = {} - for row in auto_rows + existing_rows: - canonical_product_id = row.get("canonical_product_id", "") - if canonical_product_id: - merged[canonical_product_id] = row - return sorted(merged.values(), key=lambda row: row["canonical_product_id"]) - - -def catalog_row_from_canonical(row): +def normalize_catalog_row(row): return { - "canonical_product_id": row.get("canonical_product_id", ""), - "canonical_name": row.get("canonical_name", ""), + "catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""), + "catalog_name": row.get("catalog_name") or row.get("canonical_name", ""), "category": row.get("category", ""), "product_type": row.get("product_type", ""), "brand": row.get("brand", ""), @@ -211,24 +199,67 @@ def catalog_row_from_canonical(row): } -def build_link_state(enriched_rows): - observed_rows = build_observed_products.build_observed_products(enriched_rows) - canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows) - giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows) - canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair( - canonical_rows, - link_rows, - giant_row, - costco_row, - ) +def is_review_first_catalog_row(row): + notes = row.get("notes", "").strip().lower() + if notes.startswith("auto-linked via"): + return False + return True - observed_id_by_key = { - row["observed_key"]: row["observed_product_id"] for row in observed_rows + +def normalize_link_row(row): + return { + "normalized_item_id": row.get("normalized_item_id", ""), + "catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""), + "link_method": row.get("link_method", ""), + "link_confidence": row.get("link_confidence", ""), + "review_status": row.get("review_status", ""), + "reviewed_by": row.get("reviewed_by", ""), + "reviewed_at": row.get("reviewed_at", ""), + "link_notes": row.get("link_notes", ""), } - canonical_id_by_observed = { - row["observed_product_id"]: row["canonical_product_id"] for row in link_rows + + +def normalize_resolution_row(row): + return { + "normalized_item_id": row.get("normalized_item_id", ""), + "catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""), + "resolution_action": row.get("resolution_action", ""), + "status": row.get("status", ""), + "resolution_notes": row.get("resolution_notes", ""), + "reviewed_at": row.get("reviewed_at", ""), } - return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed + + +def load_resolution_lookup(resolution_rows): + lookup = {} + for row in resolution_rows: + normalized_row = normalize_resolution_row(row) + normalized_item_id = normalized_row.get("normalized_item_id", "") + if not normalized_item_id: + continue + lookup[normalized_item_id] = normalized_row + return lookup + + +def merge_catalog_rows(existing_rows, new_rows): + merged = {} + for row in existing_rows + new_rows: + normalized_row = normalize_catalog_row(row) + catalog_id = normalized_row.get("catalog_id", "") + if catalog_id: + merged[catalog_id] = normalized_row + return sorted(merged.values(), key=lambda row: row["catalog_id"]) + + +def load_link_lookup(link_rows): + lookup = {} + for row in link_rows: + normalized_row = normalize_link_row(row) + normalized_item_id = normalized_row.get("normalized_item_id", "") + if not normalized_item_id: + continue + lookup[normalized_item_id] = normalized_row + return lookup def build_purchase_rows( @@ -237,25 +268,37 @@ def build_purchase_rows( giant_orders, costco_orders, resolution_rows, + link_rows=None, + catalog_rows=None, ): all_enriched_rows = giant_enriched_rows + costco_enriched_rows - ( - observed_rows, - canonical_rows, - link_rows, - observed_id_by_key, - canonical_id_by_observed, - ) = build_link_state(all_enriched_rows) resolution_lookup = load_resolution_lookup(resolution_rows) - for observed_product_id, resolution in resolution_lookup.items(): + link_lookup = load_link_lookup(link_rows or []) + catalog_lookup = { + row["catalog_id"]: normalize_catalog_row(row) + for row in (catalog_rows or []) + if normalize_catalog_row(row).get("catalog_id") + } + + for normalized_item_id, resolution in resolution_lookup.items(): action = resolution.get("resolution_action", "") status = resolution.get("status", "") if status != "approved": continue - if action in {"link", "create"} and resolution.get("canonical_product_id"): - canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"] + if action in {"link", "create"} and resolution.get("catalog_id"): + link_lookup[normalized_item_id] = { + "normalized_item_id": normalized_item_id, + "catalog_id": resolution["catalog_id"], + "link_method": f"manual_{action}", + "link_confidence": "high", + "review_status": status, + "reviewed_by": "", + "reviewed_at": resolution.get("reviewed_at", ""), + "link_notes": resolution.get("resolution_notes", ""), + } elif action == "exclude": - canonical_id_by_observed[observed_product_id] = "" + link_lookup.pop(normalized_item_id, None) + orders_by_id = {} orders_by_id.update(order_lookup(giant_orders, "giant")) orders_by_id.update(order_lookup(costco_orders, "costco")) @@ -265,24 +308,30 @@ def build_purchase_rows( all_enriched_rows, key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])), ): - observed_key = build_observed_products.build_observed_key(row) - observed_product_id = observed_id_by_key.get(observed_key, "") + normalized_item_id = row.get("normalized_item_id", "") + resolution = resolution_lookup.get(normalized_item_id, {}) + link_row = link_lookup.get(normalized_item_id, {}) + catalog_row = catalog_lookup.get(link_row.get("catalog_id", ""), {}) order_row = orders_by_id.get((row["retailer"], row["order_id"]), {}) metrics = derive_metrics(row) - resolution = resolution_lookup.get(observed_product_id, {}) purchase_rows.append( { "purchase_date": row["order_date"], "retailer": row["retailer"], "order_id": row["order_id"], "line_no": row["line_no"], - "observed_item_key": row["observed_item_key"], - "observed_product_id": observed_product_id, - "canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""), + "normalized_row_id": row.get("normalized_row_id", ""), + "normalized_item_id": normalized_item_id, + "catalog_id": link_row.get("catalog_id", ""), "review_status": resolution.get("status", ""), "resolution_action": resolution.get("resolution_action", ""), "raw_item_name": row["item_name"], "normalized_item_name": row["item_name_norm"], + "catalog_name": catalog_row.get("catalog_name", ""), + "category": catalog_row.get("category", ""), + "product_type": catalog_row.get("product_type", ""), + "brand": catalog_row.get("brand", ""), + "variant": catalog_row.get("variant", ""), "image_url": row.get("image_url", ""), "retailer_item_id": row["retailer_item_id"], "upc": row["upc"], @@ -307,33 +356,7 @@ def build_purchase_rows( **metrics, } ) - return purchase_rows, observed_rows, canonical_rows, link_rows - - -def apply_manual_resolutions_to_links(link_rows, resolution_rows): - link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows} - for resolution in resolution_rows: - if resolution.get("status") != "approved": - continue - observed_product_id = resolution.get("observed_product_id", "") - action = resolution.get("resolution_action", "") - if not observed_product_id: - continue - if action == "exclude": - link_by_observed.pop(observed_product_id, None) - continue - if action in {"link", "create"} and resolution.get("canonical_product_id"): - link_by_observed[observed_product_id] = { - "observed_product_id": observed_product_id, - "canonical_product_id": resolution["canonical_product_id"], - "link_method": f"manual_{action}", - "link_confidence": "high", - "review_status": resolution.get("status", ""), - "reviewed_by": "", - "reviewed_at": resolution.get("reviewed_at", ""), - "link_notes": resolution.get("resolution_notes", ""), - } - return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"]) + return purchase_rows, sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"]) def build_comparison_examples(purchase_rows): @@ -342,7 +365,7 @@ def build_comparison_examples(purchase_rows): for row in purchase_rows: if row.get("normalized_item_name") != "BANANA": continue - if not row.get("canonical_product_id"): + if not row.get("catalog_id"): continue if row["retailer"] == "giant" and row.get("price_per_lb"): giant_banana = row @@ -355,7 +378,7 @@ def build_comparison_examples(purchase_rows): return [ { "example_name": "banana_price_per_lb", - "canonical_product_id": giant_banana["canonical_product_id"], + "catalog_id": giant_banana["catalog_id"], "giant_purchase_date": giant_banana["purchase_date"], "giant_raw_item_name": giant_banana["raw_item_name"], "giant_price_per_lb": giant_banana["price_per_lb"], @@ -389,27 +412,29 @@ def main( examples_csv, ): resolution_rows = read_optional_csv_rows(resolutions_csv) - purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows( + catalog_rows = merge_catalog_rows( + [row for row in read_optional_csv_rows(catalog_csv) if is_review_first_catalog_row(row)], + [], + ) + existing_links = [normalize_link_row(row) for row in read_optional_csv_rows(links_csv)] + purchase_rows, link_rows = build_purchase_rows( read_csv_rows(giant_items_enriched_csv), read_csv_rows(costco_items_enriched_csv), read_csv_rows(giant_orders_csv), read_csv_rows(costco_orders_csv), resolution_rows, + existing_links, + catalog_rows, ) - existing_catalog_rows = read_optional_csv_rows(catalog_csv) - merged_catalog_rows = merge_catalog_rows( - existing_catalog_rows, - [catalog_row_from_canonical(row) for row in canonical_rows], - ) - link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows) example_rows = build_comparison_examples(purchase_rows) - write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS) - write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS) + write_csv_rows(catalog_csv, catalog_rows, CATALOG_FIELDS) + write_csv_rows(links_csv, link_rows, PRODUCT_LINK_FIELDS) write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS) write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS) click.echo( f"wrote {len(purchase_rows)} purchase rows to {output_csv}, " - f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, " + f"{len(catalog_rows)} catalog rows to {catalog_csv}, " + f"{len(link_rows)} product links to {links_csv}, " f"and {len(example_rows)} comparison examples to {examples_csv}" ) diff --git a/report_pipeline_status.py b/report_pipeline_status.py index 5dcf8f7..e7efe79 100644 --- a/report_pipeline_status.py +++ b/report_pipeline_status.py @@ -3,7 +3,6 @@ from pathlib import Path import click -import build_observed_products import build_purchases import review_products from layer_helpers import read_csv_rows, write_csv_rows @@ -29,33 +28,36 @@ def build_status_summary( purchases, resolutions, ): - enriched_rows = giant_enriched + costco_enriched - observed_rows = build_observed_products.build_observed_products(enriched_rows) + normalized_rows = giant_enriched + costco_enriched queue_rows = review_products.build_review_queue(purchases, resolutions) + queue_ids = {row["normalized_item_id"] for row in queue_rows} unresolved_purchase_rows = [ row for row in purchases - if row.get("observed_product_id") - and not row.get("canonical_product_id") + if row.get("normalized_item_id") + and not row.get("catalog_id") and row.get("is_fee") != "true" and row.get("is_discount_line") != "true" and row.get("is_coupon_line") != "true" ] - excluded_rows = [ - row - for row in purchases - if row.get("resolution_action") == "exclude" - ] - linked_purchase_rows = [row for row in purchases if row.get("canonical_product_id")] + excluded_rows = [row for row in purchases if row.get("resolution_action") == "exclude"] + linked_purchase_rows = [row for row in purchases if row.get("catalog_id")] + distinct_normalized_items = { + row["normalized_item_id"] for row in normalized_rows if row.get("normalized_item_id") + } + linked_normalized_items = { + row["normalized_item_id"] for row in purchases if row.get("normalized_item_id") and row.get("catalog_id") + } summary = [ {"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)}, {"stage": "raw_items", "count": len(giant_items) + len(costco_items)}, - {"stage": "enriched_items", "count": len(enriched_rows)}, - {"stage": "observed_products", "count": len(observed_rows)}, - {"stage": "review_queue_observed_products", "count": len(queue_rows)}, - {"stage": "canonical_linked_purchase_rows", "count": len(linked_purchase_rows)}, + {"stage": "normalized_items", "count": len(normalized_rows)}, + {"stage": "distinct_normalized_items", "count": len(distinct_normalized_items)}, + {"stage": "review_queue_normalized_items", "count": len(queue_rows)}, + {"stage": "linked_normalized_items", "count": len(linked_normalized_items)}, + {"stage": "linked_purchase_rows", "count": len(linked_purchase_rows)}, {"stage": "final_purchase_rows", "count": len(purchases)}, {"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)}, {"stage": "excluded_purchase_rows", "count": len(excluded_rows)}, @@ -65,8 +67,7 @@ def build_status_summary( [ row for row in unresolved_purchase_rows - if row.get("observed_product_id") - not in {queue_row["observed_product_id"] for queue_row in queue_rows} + if row.get("normalized_item_id") not in queue_ids ] ), }, @@ -105,7 +106,7 @@ def main( read_rows_if_exists(costco_items_csv), read_rows_if_exists(costco_enriched_csv), read_rows_if_exists(purchases_csv), - read_rows_if_exists(resolutions_csv), + [build_purchases.normalize_resolution_row(row) for row in read_rows_if_exists(resolutions_csv)], ) write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS) summary_json_path = Path(summary_json) diff --git a/review_products.py b/review_products.py index b549418..f84f213 100644 --- a/review_products.py +++ b/review_products.py @@ -10,8 +10,8 @@ from layer_helpers import compact_join, stable_id, write_csv_rows QUEUE_FIELDS = [ "review_id", "retailer", - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "reason_code", "priority", "raw_item_names", @@ -26,36 +26,49 @@ QUEUE_FIELDS = [ "updated_at", ] +INFO_COLOR = "cyan" +PROMPT_COLOR = "bright_yellow" +WARNING_COLOR = "magenta" + def build_review_queue(purchase_rows, resolution_rows): - by_observed = defaultdict(list) + by_normalized = defaultdict(list) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) for row in purchase_rows: - observed_product_id = row.get("observed_product_id", "") - if not observed_product_id: + normalized_item_id = row.get("normalized_item_id", "") + if not normalized_item_id: continue - by_observed[observed_product_id].append(row) + by_normalized[normalized_item_id].append(row) today_text = str(date.today()) queue_rows = [] - for observed_product_id, rows in sorted(by_observed.items()): - current_resolution = resolution_lookup.get(observed_product_id, {}) + for normalized_item_id, rows in sorted(by_normalized.items()): + current_resolution = resolution_lookup.get(normalized_item_id, {}) if current_resolution.get("status") == "approved": continue - unresolved_rows = [row for row in rows if not row.get("canonical_product_id")] + + unresolved_rows = [ + row + for row in rows + if not row.get("catalog_id") + and row.get("is_item", "true") != "false" + and row.get("is_fee") != "true" + and row.get("is_discount_line") != "true" + and row.get("is_coupon_line") != "true" + ] if not unresolved_rows: continue retailers = sorted({row["retailer"] for row in rows}) - review_id = stable_id("rvw", observed_product_id) + review_id = stable_id("rvw", normalized_item_id) queue_rows.append( { "review_id": review_id, "retailer": " | ".join(retailers), - "observed_product_id": observed_product_id, - "canonical_product_id": current_resolution.get("canonical_product_id", ""), - "reason_code": "missing_canonical_link", + "normalized_item_id": normalized_item_id, + "catalog_id": current_resolution.get("catalog_id", ""), + "reason_code": "missing_catalog_link", "priority": "high", "raw_item_names": compact_join( sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}), @@ -98,11 +111,6 @@ def save_catalog_rows(path, rows): write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS) -INFO_COLOR = "cyan" -PROMPT_COLOR = "bright_yellow" -WARNING_COLOR = "magenta" - - def sort_related_items(rows): return sorted( rows, @@ -115,7 +123,7 @@ def sort_related_items(rows): ) -def build_canonical_suggestions(related_rows, catalog_rows, limit=3): +def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3): normalized_names = { row.get("normalized_item_name", "").strip().upper() for row in related_rows @@ -126,56 +134,52 @@ def build_canonical_suggestions(related_rows, catalog_rows, limit=3): for row in related_rows if row.get("upc", "").strip() } + catalog_by_id = { + row.get("catalog_id", ""): row for row in catalog_rows if row.get("catalog_id", "") + } suggestions = [] seen_ids = set() - def add_matches(rows, reason): - for row in rows: - canonical_product_id = row.get("canonical_product_id", "") - if not canonical_product_id or canonical_product_id in seen_ids: - continue - seen_ids.add(canonical_product_id) - suggestions.append( - { - "canonical_product_id": canonical_product_id, - "canonical_name": row.get("canonical_name", ""), - "reason": reason, - } - ) - if len(suggestions) >= limit: - return True - return False + def add_catalog_id(catalog_id, reason): + if not catalog_id or catalog_id in seen_ids or catalog_id not in catalog_by_id: + return False + seen_ids.add(catalog_id) + catalog_row = catalog_by_id[catalog_id] + suggestions.append( + { + "catalog_id": catalog_id, + "catalog_name": catalog_row.get("catalog_name", ""), + "reason": reason, + } + ) + return len(suggestions) >= limit - exact_upc_rows = [ - row - for row in catalog_rows - if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs + reviewed_purchase_rows = [ + row for row in purchase_rows if row.get("catalog_id") and row.get("normalized_item_id") ] - if add_matches(exact_upc_rows, "exact upc"): - return suggestions + for row in reviewed_purchase_rows: + if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs: + if add_catalog_id(row.get("catalog_id", ""), "exact upc"): + return suggestions - exact_name_rows = [ - row - for row in catalog_rows - if row.get("canonical_name", "").strip().upper() in normalized_names - ] - if add_matches(exact_name_rows, "exact normalized name"): - return suggestions + for row in reviewed_purchase_rows: + if row.get("normalized_item_name", "").strip().upper() in normalized_names: + if add_catalog_id(row.get("catalog_id", ""), "exact normalized name"): + return suggestions - contains_rows = [] - for row in catalog_rows: - canonical_name = row.get("canonical_name", "").strip().upper() - if not canonical_name: + for catalog_row in catalog_rows: + catalog_name = catalog_row.get("catalog_name", "").strip().upper() + if not catalog_name: continue for normalized_name in normalized_names: - if normalized_name in canonical_name or canonical_name in normalized_name: - contains_rows.append(row) + if normalized_name in catalog_name or catalog_name in normalized_name: + if add_catalog_id(catalog_row.get("catalog_id", ""), "catalog name contains match"): + return suggestions break - add_matches(contains_rows, "canonical name contains match") return suggestions -def build_display_lines(queue_row, related_rows): +def build_display_lines(related_rows): lines = [] for index, row in enumerate(sort_related_items(related_rows), start=1): lines.append( @@ -197,41 +201,38 @@ def build_display_lines(queue_row, related_rows): return lines -def observed_name(queue_row, related_rows): +def normalized_label(queue_row, related_rows): if queue_row.get("normalized_names"): return queue_row["normalized_names"].split(" | ")[0] for row in related_rows: if row.get("normalized_item_name"): return row["normalized_item_name"] - return queue_row.get("observed_product_id", "") + return queue_row.get("normalized_item_id", "") -def choose_existing_canonical(display_rows, observed_label, matched_count): +def choose_existing_catalog(display_rows, normalized_name, matched_count): click.secho( - f"Select the canonical_name to associate {matched_count} items with:", + f"Select the catalog_name to associate {matched_count} items with:", fg=INFO_COLOR, ) for index, row in enumerate(display_rows, start=1): - click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}") + click.echo(f" [{index}] {row['catalog_name']} | {row['catalog_id']}") choice = click.prompt( click.style("selection", fg=PROMPT_COLOR), type=click.IntRange(1, len(display_rows)), ) chosen_row = display_rows[choice - 1] click.echo( - f'{matched_count} "{observed_label}" items and future matches will be associated ' - f'with "{chosen_row["canonical_name"]}".' - ) - click.secho( - "actions: [y]es [n]o [b]ack [s]kip [q]uit", - fg=PROMPT_COLOR, + f'{matched_count} "{normalized_name}" items and future matches will be associated ' + f'with "{chosen_row["catalog_name"]}".' ) + click.secho("actions: [y]es [n]o [b]ack [s]kip [q]uit", fg=PROMPT_COLOR) confirm = click.prompt( click.style("confirm", fg=PROMPT_COLOR), type=click.Choice(["y", "n", "b", "s", "q"]), ) if confirm == "y": - return chosen_row["canonical_product_id"], "" + return chosen_row["catalog_id"], "" if confirm == "s": return "", "skip" if confirm == "q": @@ -239,54 +240,43 @@ def choose_existing_canonical(display_rows, observed_label, matched_count): return "", "back" -def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total): - suggestions = build_canonical_suggestions(related_rows, catalog_rows) - observed_label = observed_name(queue_row, related_rows) +def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total): + suggestions = build_catalog_suggestions(related_rows, purchase_rows, catalog_rows) + normalized_name = normalized_label(queue_row, related_rows) matched_count = len(related_rows) click.echo("") click.secho( - f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} " - "to canonical_name [__]?", + f"Review {queue_index}/{queue_total}: Resolve normalized_item {normalized_name} " + "to catalog_name [__]?", fg=INFO_COLOR, ) click.echo(f"{matched_count} matched items:") - for line in build_display_lines(queue_row, related_rows): + for line in build_display_lines(related_rows): click.echo(line) if suggestions: - click.echo(f"{len(suggestions)} canonical suggestions found:") + click.echo(f"{len(suggestions)} catalog_name suggestions found:") for index, suggestion in enumerate(suggestions, start=1): - click.echo(f" [{index}] {suggestion['canonical_name']}") + click.echo(f" [{index}] {suggestion['catalog_name']}") else: - click.echo("no canonical_name suggestions found") - click.secho( - "[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", - fg=PROMPT_COLOR, - ) - action = click.prompt( - "", - type=click.Choice(["l", "n", "x", "s", "q"]), - prompt_suffix=" ", - ) + click.echo("no catalog_name suggestions found") + click.secho("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", fg=PROMPT_COLOR) + action = click.prompt("", type=click.Choice(["l", "n", "x", "s", "q"]), prompt_suffix=" ") if action == "q": return None, None if action == "s": return { - "observed_product_id": queue_row["observed_product_id"], - "canonical_product_id": "", + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": "", "resolution_action": "skip", "status": "pending", "resolution_notes": queue_row.get("resolution_notes", ""), "reviewed_at": str(date.today()), }, None if action == "x": - notes = click.prompt( - click.style("exclude notes", fg=PROMPT_COLOR), - default="", - show_default=False, - ) + notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False) return { - "observed_product_id": queue_row["observed_product_id"], - "canonical_product_id": "", + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": "", "resolution_action": "exclude", "status": "approved", "resolution_notes": notes, @@ -295,22 +285,19 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_ if action == "l": display_rows = suggestions or [ { - "canonical_product_id": row["canonical_product_id"], - "canonical_name": row["canonical_name"], + "catalog_id": row["catalog_id"], + "catalog_name": row["catalog_name"], "reason": "catalog sample", } for row in catalog_rows[:10] + if row.get("catalog_id") ] while True: - canonical_product_id, outcome = choose_existing_canonical( - display_rows, - observed_label, - matched_count, - ) + catalog_id, outcome = choose_existing_catalog(display_rows, normalized_name, matched_count) if outcome == "skip": return { - "observed_product_id": queue_row["observed_product_id"], - "canonical_product_id": "", + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": "", "resolution_action": "skip", "status": "pending", "resolution_notes": queue_row.get("resolution_notes", ""), @@ -323,34 +310,22 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_ break notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) return { - "observed_product_id": queue_row["observed_product_id"], - "canonical_product_id": canonical_product_id, + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": catalog_id, "resolution_action": "link", "status": "approved", "resolution_notes": notes, "reviewed_at": str(date.today()), }, None - canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str) - category = click.prompt( - click.style("category", fg=PROMPT_COLOR), - default="", - show_default=False, - ) - product_type = click.prompt( - click.style("product type", fg=PROMPT_COLOR), - default="", - show_default=False, - ) - notes = click.prompt( - click.style("notes", fg=PROMPT_COLOR), - default="", - show_default=False, - ) - canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}") - canonical_row = { - "canonical_product_id": canonical_product_id, - "canonical_name": canonical_name, + catalog_name = click.prompt(click.style("catalog name", fg=PROMPT_COLOR), type=str) + category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False) + product_type = click.prompt(click.style("product type", fg=PROMPT_COLOR), default="", show_default=False) + notes = click.prompt(click.style("notes", fg=PROMPT_COLOR), default="", show_default=False) + catalog_id = stable_id("cat", f"manual|{catalog_name}|{category}|{product_type}") + catalog_row = { + "catalog_id": catalog_id, + "catalog_name": catalog_name, "category": category, "product_type": product_type, "brand": "", @@ -364,14 +339,14 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_ "updated_at": str(date.today()), } resolution_row = { - "observed_product_id": queue_row["observed_product_id"], - "canonical_product_id": canonical_product_id, + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": catalog_id, "resolution_action": "create", "status": "approved", "resolution_notes": notes, "reviewed_at": str(date.today()), } - return resolution_row, canonical_row + return resolution_row, catalog_row @click.command() @@ -384,7 +359,7 @@ def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only): purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv) resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv) - catalog_rows = build_purchases.read_optional_csv_rows(catalog_csv) + catalog_rows = build_purchases.merge_catalog_rows(build_purchases.read_optional_csv_rows(catalog_csv), []) queue_rows = build_review_queue(purchase_rows, resolution_rows) write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS) click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}") @@ -393,29 +368,33 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_ return resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) - catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")} - rows_by_observed = defaultdict(list) + catalog_by_id = {row["catalog_id"]: row for row in catalog_rows if row.get("catalog_id")} + rows_by_normalized = defaultdict(list) for row in purchase_rows: - observed_product_id = row.get("observed_product_id", "") - if observed_product_id: - rows_by_observed[observed_product_id].append(row) + normalized_item_id = row.get("normalized_item_id", "") + if normalized_item_id: + rows_by_normalized[normalized_item_id].append(row) + reviewed = 0 for index, queue_row in enumerate(queue_rows, start=1): if limit and reviewed >= limit: break - related_rows = rows_by_observed.get(queue_row["observed_product_id"], []) - result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows)) + related_rows = rows_by_normalized.get(queue_row["normalized_item_id"], []) + result = prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, index, len(queue_rows)) if result == (None, None): break - resolution_row, canonical_row = result - resolution_lookup[resolution_row["observed_product_id"]] = resolution_row - if canonical_row and canonical_row["canonical_product_id"] not in catalog_by_id: - catalog_by_id[canonical_row["canonical_product_id"]] = canonical_row - catalog_rows.append(canonical_row) + resolution_row, catalog_row = result + resolution_lookup[resolution_row["normalized_item_id"]] = resolution_row + if catalog_row and catalog_row["catalog_id"] not in catalog_by_id: + catalog_by_id[catalog_row["catalog_id"]] = catalog_row + catalog_rows.append(catalog_row) reviewed += 1 - save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["observed_product_id"])) - save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["canonical_product_id"])) + save_resolution_rows( + resolutions_csv, + sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]), + ) + save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"])) click.echo( f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} " f"and {len(catalog_by_id)} catalog rows to {catalog_csv}" diff --git a/tests/test_pipeline_status.py b/tests/test_pipeline_status.py index 573608d..7b3127e 100644 --- a/tests/test_pipeline_status.py +++ b/tests/test_pipeline_status.py @@ -13,6 +13,7 @@ class PipelineStatusTests(unittest.TestCase): "retailer": "giant", "order_id": "g1", "line_no": "1", + "normalized_item_id": "gnorm_banana", "item_name_norm": "BANANA", "item_name": "FRESH BANANA", "retailer_item_id": "1", @@ -37,8 +38,8 @@ class PipelineStatusTests(unittest.TestCase): costco_enriched=[], purchases=[ { - "observed_product_id": "gobs_banana", - "canonical_product_id": "gcan_banana", + "normalized_item_id": "gnorm_banana", + "catalog_id": "cat_banana", "resolution_action": "", "is_fee": "false", "is_discount_line": "false", @@ -50,8 +51,8 @@ class PipelineStatusTests(unittest.TestCase): "line_total": "1.29", }, { - "observed_product_id": "gobs_lime", - "canonical_product_id": "", + "normalized_item_id": "cnorm_lime", + "catalog_id": "", "resolution_action": "", "is_fee": "false", "is_discount_line": "false", @@ -69,10 +70,10 @@ class PipelineStatusTests(unittest.TestCase): counts = {row["stage"]: row["count"] for row in summary} self.assertEqual(1, counts["raw_orders"]) self.assertEqual(1, counts["raw_items"]) - self.assertEqual(1, counts["enriched_items"]) - self.assertEqual(1, counts["canonical_linked_purchase_rows"]) + self.assertEqual(1, counts["normalized_items"]) + self.assertEqual(1, counts["linked_purchase_rows"]) self.assertEqual(1, counts["unresolved_purchase_rows"]) - self.assertEqual(1, counts["review_queue_observed_products"]) + self.assertEqual(1, counts["review_queue_normalized_items"]) self.assertEqual(0, counts["unresolved_not_in_review_rows"]) diff --git a/tests/test_purchases.py b/tests/test_purchases.py index 1b6cb08..0d48c5e 100644 --- a/tests/test_purchases.py +++ b/tests/test_purchases.py @@ -29,7 +29,7 @@ class PurchaseLogTests(unittest.TestCase): self.assertEqual("0.125", metrics["price_per_oz"]) self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"]) - def test_build_purchase_rows_maps_canonical_ids(self): + def test_build_purchase_rows_maps_catalog_ids(self): fieldnames = enrich_costco.OUTPUT_FIELDS giant_row = {field: "" for field in fieldnames} giant_row.update( @@ -37,7 +37,8 @@ class PurchaseLogTests(unittest.TestCase): "retailer": "giant", "order_id": "g1", "line_no": "1", - "observed_item_key": "giant:g1:1", + "normalized_row_id": "giant:g1:1", + "normalized_item_id": "gnorm:banana", "order_date": "2026-03-01", "item_name": "FRESH BANANA", "item_name_norm": "BANANA", @@ -50,7 +51,7 @@ class PurchaseLogTests(unittest.TestCase): "unit_price": "1.29", "measure_type": "weight", "price_per_lb": "1.29", - "raw_order_path": "giant_output/raw/g1.json", + "raw_order_path": "data/giant-web/raw/g1.json", "is_discount_line": "false", "is_coupon_line": "false", "is_fee": "false", @@ -62,7 +63,8 @@ class PurchaseLogTests(unittest.TestCase): "retailer": "costco", "order_id": "c1", "line_no": "1", - "observed_item_key": "costco:c1:1", + "normalized_row_id": "costco:c1:1", + "normalized_item_id": "cnorm:banana", "order_date": "2026-03-12", "item_name": "BANANAS 3 LB / 1.36 KG", "item_name_norm": "BANANA", @@ -75,7 +77,7 @@ class PurchaseLogTests(unittest.TestCase): "size_unit": "lb", "measure_type": "weight", "price_per_lb": "0.9933", - "raw_order_path": "costco_output/raw/c1.json", + "raw_order_path": "data/costco-web/raw/c1.json", "is_discount_line": "false", "is_coupon_line": "false", "is_fee": "false", @@ -99,17 +101,58 @@ class PurchaseLogTests(unittest.TestCase): "store_state": "VA", } ] + catalog_rows = [ + { + "catalog_id": "cat_banana", + "catalog_name": "BANANA", + "category": "produce", + "product_type": "banana", + "brand": "", + "variant": "", + "size_value": "", + "size_unit": "", + "pack_qty": "", + "measure_type": "", + "notes": "", + "created_at": "", + "updated_at": "", + } + ] + link_rows = [ + { + "normalized_item_id": "gnorm:banana", + "catalog_id": "cat_banana", + "link_method": "manual_link", + "link_confidence": "high", + "review_status": "approved", + "reviewed_by": "", + "reviewed_at": "", + "link_notes": "", + }, + { + "normalized_item_id": "cnorm:banana", + "catalog_id": "cat_banana", + "link_method": "manual_link", + "link_confidence": "high", + "review_status": "approved", + "reviewed_by": "", + "reviewed_at": "", + "link_notes": "", + }, + ] - rows, _observed, _canon, _links = build_purchases.build_purchase_rows( + rows, _links = build_purchases.build_purchase_rows( [giant_row], [costco_row], giant_orders, costco_orders, [], + link_rows, + catalog_rows, ) self.assertEqual(2, len(rows)) - self.assertTrue(all(row["canonical_product_id"] for row in rows)) + self.assertTrue(all(row["catalog_id"] == "cat_banana" for row in rows)) self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows}) self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"]) @@ -120,10 +163,10 @@ class PurchaseLogTests(unittest.TestCase): giant_orders = Path(tmpdir) / "giant_orders.csv" costco_orders = Path(tmpdir) / "costco_orders.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" - catalog_csv = Path(tmpdir) / "canonical_catalog.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" links_csv = Path(tmpdir) / "product_links.csv" - purchases_csv = Path(tmpdir) / "combined" / "purchases.csv" - examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv" + purchases_csv = Path(tmpdir) / "review" / "purchases.csv" + examples_csv = Path(tmpdir) / "review" / "comparison_examples.csv" fieldnames = enrich_costco.OUTPUT_FIELDS giant_row = {field: "" for field in fieldnames} @@ -132,7 +175,8 @@ class PurchaseLogTests(unittest.TestCase): "retailer": "giant", "order_id": "g1", "line_no": "1", - "observed_item_key": "giant:g1:1", + "normalized_row_id": "giant:g1:1", + "normalized_item_id": "gnorm:banana", "order_date": "2026-03-01", "item_name": "FRESH BANANA", "item_name_norm": "BANANA", @@ -144,7 +188,7 @@ class PurchaseLogTests(unittest.TestCase): "unit_price": "1.29", "measure_type": "weight", "price_per_lb": "1.29", - "raw_order_path": "giant_output/raw/g1.json", + "raw_order_path": "data/giant-web/raw/g1.json", "is_discount_line": "false", "is_coupon_line": "false", "is_fee": "false", @@ -156,7 +200,8 @@ class PurchaseLogTests(unittest.TestCase): "retailer": "costco", "order_id": "c1", "line_no": "1", - "observed_item_key": "costco:c1:1", + "normalized_row_id": "costco:c1:1", + "normalized_item_id": "cnorm:banana", "order_date": "2026-03-12", "item_name": "BANANAS 3 LB / 1.36 KG", "item_name_norm": "BANANA", @@ -169,17 +214,14 @@ class PurchaseLogTests(unittest.TestCase): "size_unit": "lb", "measure_type": "weight", "price_per_lb": "0.9933", - "raw_order_path": "costco_output/raw/c1.json", + "raw_order_path": "data/costco-web/raw/c1.json", "is_discount_line": "false", "is_coupon_line": "false", "is_fee": "false", } ) - for path, source_rows in [ - (giant_items, [giant_row]), - (costco_items, [costco_row]), - ]: + for path, source_rows in [(giant_items, [giant_row]), (costco_items, [costco_row])]: with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() @@ -217,6 +259,55 @@ class PurchaseLogTests(unittest.TestCase): writer.writeheader() writer.writerows(source_rows) + with catalog_csv.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=build_purchases.CATALOG_FIELDS) + writer.writeheader() + writer.writerow( + { + "catalog_id": "cat_banana", + "catalog_name": "BANANA", + "category": "produce", + "product_type": "banana", + "brand": "", + "variant": "", + "size_value": "", + "size_unit": "", + "pack_qty": "", + "measure_type": "", + "notes": "", + "created_at": "", + "updated_at": "", + } + ) + + with links_csv.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=build_purchases.PRODUCT_LINK_FIELDS) + writer.writeheader() + writer.writerows( + [ + { + "normalized_item_id": "gnorm:banana", + "catalog_id": "cat_banana", + "link_method": "manual_link", + "link_confidence": "high", + "review_status": "approved", + "reviewed_by": "", + "reviewed_at": "", + "link_notes": "", + }, + { + "normalized_item_id": "cnorm:banana", + "catalog_id": "cat_banana", + "link_method": "manual_link", + "link_confidence": "high", + "review_status": "approved", + "reviewed_by": "", + "reviewed_at": "", + "link_notes": "", + }, + ] + ) + build_purchases.main.callback( giant_items_enriched_csv=str(giant_items), costco_items_enriched_csv=str(costco_items), @@ -246,7 +337,8 @@ class PurchaseLogTests(unittest.TestCase): "retailer": "giant", "order_id": "g1", "line_no": "1", - "observed_item_key": "giant:g1:1", + "normalized_row_id": "giant:g1:1", + "normalized_item_id": "gnorm:ice", "order_date": "2026-03-01", "item_name": "SB BAGGED ICE 20LB", "item_name_norm": "BAGGED ICE", @@ -257,17 +349,14 @@ class PurchaseLogTests(unittest.TestCase): "line_total": "3.50", "unit_price": "3.50", "measure_type": "each", - "raw_order_path": "giant_output/raw/g1.json", + "raw_order_path": "data/giant-web/raw/g1.json", "is_discount_line": "false", "is_coupon_line": "false", "is_fee": "false", } ) - observed_rows, _canonical_rows, _link_rows, _observed_id_by_key, _canonical_by_observed = ( - build_purchases.build_link_state([giant_row]) - ) - observed_product_id = observed_rows[0]["observed_product_id"] - rows, _observed, _canon, _links = build_purchases.build_purchase_rows( + + rows, links = build_purchases.build_purchase_rows( [giant_row], [], [ @@ -282,19 +371,38 @@ class PurchaseLogTests(unittest.TestCase): [], [ { - "observed_product_id": observed_product_id, - "canonical_product_id": "gcan_manual_ice", + "normalized_item_id": "gnorm:ice", + "catalog_id": "cat_ice", "resolution_action": "create", "status": "approved", "resolution_notes": "manual ice merge", "reviewed_at": "2026-03-16", } ], + [], + [ + { + "catalog_id": "cat_ice", + "catalog_name": "ICE", + "category": "frozen", + "product_type": "ice", + "brand": "", + "variant": "", + "size_value": "", + "size_unit": "", + "pack_qty": "", + "measure_type": "", + "notes": "", + "created_at": "", + "updated_at": "", + } + ], ) - self.assertEqual("gcan_manual_ice", rows[0]["canonical_product_id"]) + self.assertEqual("cat_ice", rows[0]["catalog_id"]) self.assertEqual("approved", rows[0]["review_status"]) self.assertEqual("create", rows[0]["resolution_action"]) + self.assertEqual("cat_ice", links[0]["catalog_id"]) if __name__ == "__main__": diff --git a/tests/test_review_workflow.py b/tests/test_review_workflow.py index 5749338..6a597f0 100644 --- a/tests/test_review_workflow.py +++ b/tests/test_review_workflow.py @@ -14,33 +14,39 @@ class ReviewWorkflowTests(unittest.TestCase): queue_rows = review_products.build_review_queue( [ { - "observed_product_id": "gobs_1", - "canonical_product_id": "", + "normalized_item_id": "gnorm_1", + "catalog_id": "", "retailer": "giant", "raw_item_name": "SB BAGGED ICE 20LB", "normalized_item_name": "BAGGED ICE", "upc": "", "line_total": "3.50", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", }, { - "observed_product_id": "gobs_1", - "canonical_product_id": "", + "normalized_item_id": "gnorm_1", + "catalog_id": "", "retailer": "giant", "raw_item_name": "SB BAG ICE CUBED 10LB", "normalized_item_name": "BAG ICE", "upc": "", "line_total": "2.50", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", }, ], [], ) self.assertEqual(1, len(queue_rows)) - self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"]) + self.assertEqual("gnorm_1", queue_rows[0]["normalized_item_id"]) self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"]) - def test_build_canonical_suggestions_prefers_upc_then_name(self): - suggestions = review_products.build_canonical_suggestions( + def test_build_catalog_suggestions_prefers_upc_then_name(self): + suggestions = review_products.build_catalog_suggestions( [ { "normalized_item_name": "MIXED PEPPER", @@ -49,36 +55,41 @@ class ReviewWorkflowTests(unittest.TestCase): ], [ { - "canonical_product_id": "gcan_1", - "canonical_name": "MIXED PEPPER", - "upc": "", + "normalized_item_id": "prior_1", + "normalized_item_name": "MIXED PEPPER 6 PACK", + "upc": "12345", + "catalog_id": "cat_2", + } + ], + [ + { + "catalog_id": "cat_1", + "catalog_name": "MIXED PEPPER", }, { - "canonical_product_id": "gcan_2", - "canonical_name": "MIXED PEPPER 6 PACK", - "upc": "12345", + "catalog_id": "cat_2", + "catalog_name": "MIXED PEPPER 6 PACK", }, ], ) - self.assertEqual("gcan_2", suggestions[0]["canonical_product_id"]) + self.assertEqual("cat_2", suggestions[0]["catalog_id"]) self.assertEqual("exact upc", suggestions[0]["reason"]) - self.assertEqual("gcan_1", suggestions[1]["canonical_product_id"]) def test_review_products_displays_position_items_and_suggestions(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv" queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" - catalog_csv = Path(tmpdir) / "canonical_catalog.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" purchase_fields = [ "purchase_date", "retailer", "order_id", "line_no", - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "raw_item_name", "normalized_item_name", "image_url", @@ -95,8 +106,8 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer": "costco", "order_id": "c2", "line_no": "2", - "observed_product_id": "gobs_mix", - "canonical_product_id": "", + "normalized_item_id": "cnorm_mix", + "catalog_id": "", "raw_item_name": "MIXED PEPPER 6-PACK", "normalized_item_name": "MIXED PEPPER", "image_url": "", @@ -108,14 +119,27 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer": "costco", "order_id": "c1", "line_no": "1", - "observed_product_id": "gobs_mix", - "canonical_product_id": "", + "normalized_item_id": "cnorm_mix", + "catalog_id": "", "raw_item_name": "MIXED PEPPER 6-PACK", "normalized_item_name": "MIXED PEPPER", "image_url": "https://example.test/mixed-pepper.jpg", "upc": "", "line_total": "6.99", }, + { + "purchase_date": "2026-03-10", + "retailer": "giant", + "order_id": "g1", + "line_no": "1", + "normalized_item_id": "gnorm_mix", + "catalog_id": "cat_mix", + "raw_item_name": "MIXED PEPPER", + "normalized_item_name": "MIXED PEPPER", + "image_url": "", + "upc": "", + "line_total": "5.99", + }, ] ) @@ -124,8 +148,8 @@ class ReviewWorkflowTests(unittest.TestCase): writer.writeheader() writer.writerow( { - "canonical_product_id": "gcan_mix", - "canonical_name": "MIXED PEPPER", + "catalog_id": "cat_mix", + "catalog_name": "MIXED PEPPER", "category": "produce", "product_type": "pepper", "brand": "", @@ -158,14 +182,14 @@ class ReviewWorkflowTests(unittest.TestCase): ) self.assertEqual(0, result.exit_code) - self.assertIn("Review 1/1: Resolve observed_product MIXED PEPPER to canonical_name [__]?", result.output) + self.assertIn("Review 1/1: Resolve normalized_item MIXED PEPPER to catalog_name [__]?", result.output) self.assertIn("2 matched items:", result.output) - self.assertIn("[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", result.output) + self.assertIn("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", result.output) first_item = result.output.index("[1] 2026-03-14 | 7.49") second_item = result.output.index("[2] 2026-03-12 | 6.99") self.assertLess(first_item, second_item) self.assertIn("https://example.test/mixed-pepper.jpg", result.output) - self.assertIn("1 canonical suggestions found:", result.output) + self.assertIn("1 catalog_name suggestions found:", result.output) self.assertIn("[1] MIXED PEPPER", result.output) self.assertIn("\x1b[", result.output) @@ -174,7 +198,7 @@ class ReviewWorkflowTests(unittest.TestCase): purchases_csv = Path(tmpdir) / "purchases.csv" queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" - catalog_csv = Path(tmpdir) / "canonical_catalog.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( @@ -184,8 +208,8 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer", "order_id", "line_no", - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "raw_item_name", "normalized_item_name", "image_url", @@ -200,8 +224,8 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer": "giant", "order_id": "g1", "line_no": "1", - "observed_product_id": "gobs_ice", - "canonical_product_id": "", + "normalized_item_id": "gnorm_ice", + "catalog_id": "", "raw_item_name": "SB BAGGED ICE 20LB", "normalized_item_name": "BAGGED ICE", "image_url": "", @@ -231,14 +255,14 @@ class ReviewWorkflowTests(unittest.TestCase): ) self.assertEqual(0, result.exit_code) - self.assertIn("no canonical_name suggestions found", result.output) + self.assertIn("no catalog_name suggestions found", result.output) def test_link_existing_uses_numbered_selection_and_confirmation(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv" queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" - catalog_csv = Path(tmpdir) / "canonical_catalog.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( @@ -248,8 +272,8 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer", "order_id", "line_no", - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "raw_item_name", "normalized_item_name", "image_url", @@ -265,8 +289,8 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer": "costco", "order_id": "c2", "line_no": "2", - "observed_product_id": "gobs_mix", - "canonical_product_id": "", + "normalized_item_id": "cnorm_mix", + "catalog_id": "", "raw_item_name": "MIXED PEPPER 6-PACK", "normalized_item_name": "MIXED PEPPER", "image_url": "", @@ -278,14 +302,27 @@ class ReviewWorkflowTests(unittest.TestCase): "retailer": "costco", "order_id": "c1", "line_no": "1", - "observed_product_id": "gobs_mix", - "canonical_product_id": "", + "normalized_item_id": "cnorm_mix", + "catalog_id": "", "raw_item_name": "MIXED PEPPER 6-PACK", "normalized_item_name": "MIXED PEPPER", "image_url": "", "upc": "", "line_total": "6.99", }, + { + "purchase_date": "2026-03-10", + "retailer": "giant", + "order_id": "g1", + "line_no": "1", + "normalized_item_id": "gnorm_mix", + "catalog_id": "cat_mix", + "raw_item_name": "MIXED PEPPER", + "normalized_item_name": "MIXED PEPPER", + "image_url": "", + "upc": "", + "line_total": "5.99", + }, ] ) @@ -294,8 +331,8 @@ class ReviewWorkflowTests(unittest.TestCase): writer.writeheader() writer.writerow( { - "canonical_product_id": "gcan_mix", - "canonical_name": "MIXED PEPPER", + "catalog_id": "cat_mix", + "catalog_name": "MIXED PEPPER", "category": "", "product_type": "", "brand": "", @@ -329,29 +366,29 @@ class ReviewWorkflowTests(unittest.TestCase): ) self.assertEqual(0, result.exit_code) - self.assertIn("Select the canonical_name to associate 2 items with:", result.output) - self.assertIn('[1] MIXED PEPPER | gcan_mix', result.output) + self.assertIn("Select the catalog_name to associate 2 items with:", result.output) + self.assertIn("[1] MIXED PEPPER | cat_mix", result.output) self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output) self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output) with resolutions_csv.open(newline="", encoding="utf-8") as handle: rows = list(csv.DictReader(handle)) - self.assertEqual("gcan_mix", rows[0]["canonical_product_id"]) + self.assertEqual("cat_mix", rows[0]["catalog_id"]) self.assertEqual("link", rows[0]["resolution_action"]) - def test_review_products_creates_canonical_and_resolution(self): + def test_review_products_creates_catalog_and_resolution(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv" queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" - catalog_csv = Path(tmpdir) / "canonical_catalog.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( handle, fieldnames=[ "purchase_date", - "observed_product_id", - "canonical_product_id", + "normalized_item_id", + "catalog_id", "retailer", "raw_item_name", "normalized_item_name", @@ -366,8 +403,8 @@ class ReviewWorkflowTests(unittest.TestCase): writer.writerow( { "purchase_date": "2026-03-15", - "observed_product_id": "gobs_ice", - "canonical_product_id": "", + "normalized_item_id": "gnorm_ice", + "catalog_id": "", "retailer": "giant", "raw_item_name": "SB BAGGED ICE 20LB", "normalized_item_name": "BAGGED ICE", @@ -402,7 +439,7 @@ class ReviewWorkflowTests(unittest.TestCase): catalog_rows = list(csv.DictReader(handle)) self.assertEqual("create", resolution_rows[0]["resolution_action"]) self.assertEqual("approved", resolution_rows[0]["status"]) - self.assertEqual("ICE", catalog_rows[0]["canonical_name"]) + self.assertEqual("ICE", catalog_rows[0]["catalog_name"]) if __name__ == "__main__":