From f93b9aa4641f6902cfab197dea07e0f78e1ad56a Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 20 Mar 2026 13:32:20 -0400 Subject: [PATCH] Add catalog search to review flow --- review_products.py | 166 ++++++++++++++++++++++------------ tests/test_review_workflow.py | 122 ++++++++++++++++++++++++- 2 files changed, 225 insertions(+), 63 deletions(-) diff --git a/review_products.py b/review_products.py index 4b3e2a0..2bbd2ee 100644 --- a/review_products.py +++ b/review_products.py @@ -1,5 +1,6 @@ from collections import defaultdict from datetime import date +import re import click @@ -29,6 +30,7 @@ QUEUE_FIELDS = [ INFO_COLOR = "cyan" PROMPT_COLOR = "bright_yellow" WARNING_COLOR = "magenta" +TOKEN_RE = re.compile(r"[A-Z0-9]+") def print_intro_text(): @@ -134,6 +136,13 @@ def sort_related_items(rows): ) +def tokenize_match_text(*values): + tokens = set() + for value in values: + tokens.update(TOKEN_RE.findall((value or "").upper())) + return tokens + + def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3): normalized_names = { row.get("normalized_item_name", "").strip().upper() @@ -190,6 +199,55 @@ def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3 return suggestions +def search_catalog_rows(query, catalog_rows, purchase_rows, current_normalized_item_id, limit=10): + query_tokens = tokenize_match_text(query) + if not query_tokens: + return [] + + linked_purchase_counts = defaultdict(int) + linked_normalized_ids = defaultdict(set) + current_catalog_id = "" + for row in purchase_rows: + catalog_id = row.get("catalog_id", "") + normalized_item_id = row.get("normalized_item_id", "") + if catalog_id and normalized_item_id: + linked_purchase_counts[catalog_id] += 1 + linked_normalized_ids[catalog_id].add(normalized_item_id) + if normalized_item_id == current_normalized_item_id and catalog_id: + current_catalog_id = catalog_id + + ranked_rows = [] + for row in catalog_rows: + catalog_id = row.get("catalog_id", "") + if not catalog_id or catalog_id == current_catalog_id: + continue + catalog_tokens = tokenize_match_text( + row.get("catalog_name", ""), + row.get("product_type", ""), + row.get("variant", ""), + ) + overlap = query_tokens & catalog_tokens + if not overlap: + continue + ranked_rows.append( + { + "catalog_id": catalog_id, + "catalog_name": row.get("catalog_name", ""), + "product_type": row.get("product_type", ""), + "category": row.get("category", ""), + "variant": row.get("variant", ""), + "linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())), + "linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0), + "score": len(overlap), + } + ) + + ranked_rows.sort( + key=lambda row: (-row["score"], row["catalog_name"], row["catalog_id"]) + ) + return ranked_rows[:limit] + + def suggestion_display_rows(suggestions, purchase_rows, catalog_rows): linked_purchase_counts = defaultdict(int) linked_normalized_ids = defaultdict(set) @@ -235,6 +293,15 @@ def suggestion_display_rows(suggestions, purchase_rows, catalog_rows): return display_rows +def print_catalog_rows(rows): + for index, row in enumerate(rows, start=1): + click.echo( + f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, " + f"{row.get('category', '')} ({row['linked_normalized_items']} items, " + f"{row['linked_purchase_rows']} rows)" + ) + + def build_display_lines(related_rows): lines = [] for index, row in enumerate(sort_related_items(related_rows), start=1): @@ -267,12 +334,7 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count): f"Select the catalog_name to associate {matched_count} items with:", fg=INFO_COLOR, ) - for index, row in enumerate(display_rows, start=1): - click.echo( - f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, " - f"{row.get('category', '')} ({row['linked_normalized_items']} items, " - f"{row['linked_purchase_rows']} rows)" - ) + print_catalog_rows(display_rows) choice = click.prompt( click.style("selection", fg=PROMPT_COLOR), type=click.IntRange(1, len(display_rows)), @@ -314,18 +376,13 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu click.echo(line) if suggestions: click.echo(f"{len(suggestions)} catalog_name suggestions found:") - for index, suggestion in enumerate(suggestions, start=1): - click.echo( - f" [{index}] {suggestion['catalog_name']}, {suggestion.get('product_type', '')}, " - f"{suggestion.get('category', '')} ({suggestion['linked_normalized_items']} items, " - f"{suggestion['linked_purchase_rows']} rows)" - ) + print_catalog_rows(suggestions) else: click.echo("no catalog_name suggestions found") prompt_bits = [] if suggestions: prompt_bits.append("[#] link to suggestion") - prompt_bits.extend(["[l]ink existing", "[n]ew", "[s]kip", "e[x]clude", "[q]uit"]) + prompt_bits.extend(["[s]earch", "[n]ew", "e[x]clude", "[q]uit"]) click.secho(" ".join(prompt_bits) + " >", fg=PROMPT_COLOR) action = click.prompt("", type=str, prompt_suffix=" ").strip().lower() if action.isdigit() and suggestions: @@ -346,14 +403,42 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu if action == "q": return None, None if action == "s": - return { - "normalized_item_id": queue_row["normalized_item_id"], - "catalog_id": "", - "resolution_action": "skip", - "status": "pending", - "resolution_notes": queue_row.get("resolution_notes", ""), - "reviewed_at": str(date.today()), - }, None + while True: + query = click.prompt(click.style("search", fg=PROMPT_COLOR), default="", show_default=False).strip() + if not query: + return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total) + search_rows = search_catalog_rows( + query, + catalog_rows, + purchase_rows, + queue_row["normalized_item_id"], + ) + if not search_rows: + click.echo("no matches found") + retry = click.prompt( + click.style("search again? [enter=yes, q=no]", fg=PROMPT_COLOR), + default="", + show_default=False, + ).strip().lower() + if retry == "q": + return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total) + continue + click.echo(f"{len(search_rows)} search results found:") + print_catalog_rows(search_rows) + choice = click.prompt( + click.style("selection", fg=PROMPT_COLOR), + type=click.IntRange(1, len(search_rows)), + ) + chosen_row = search_rows[choice - 1] + notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) + return { + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": chosen_row["catalog_id"], + "resolution_action": "link", + "status": "approved", + "resolution_notes": notes, + "reviewed_at": str(date.today()), + }, None if action == "x": notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False) return { @@ -364,45 +449,6 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu "resolution_notes": notes, "reviewed_at": str(date.today()), }, None - if action == "l": - display_rows = suggestions or [ - { - "catalog_id": row["catalog_id"], - "catalog_name": row["catalog_name"], - "reason": "catalog sample", - "product_type": row.get("product_type", ""), - "category": row.get("category", ""), - "linked_normalized_items": 0, - "linked_purchase_rows": 0, - } - for row in catalog_rows[:10] - if row.get("catalog_id") - ] - while True: - catalog_id, outcome = choose_existing_catalog(display_rows, normalized_name, matched_count) - if outcome == "skip": - return { - "normalized_item_id": queue_row["normalized_item_id"], - "catalog_id": "", - "resolution_action": "skip", - "status": "pending", - "resolution_notes": queue_row.get("resolution_notes", ""), - "reviewed_at": str(date.today()), - }, None - if outcome == "quit": - return None, None - if outcome == "back": - continue - break - notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) - return { - "normalized_item_id": queue_row["normalized_item_id"], - "catalog_id": catalog_id, - "resolution_action": "link", - "status": "approved", - "resolution_notes": notes, - "reviewed_at": str(date.today()), - }, None if action != "n": click.secho("invalid action", fg=WARNING_COLOR) return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total) diff --git a/tests/test_review_workflow.py b/tests/test_review_workflow.py index 4aa93b1..f0f3e0d 100644 --- a/tests/test_review_workflow.py +++ b/tests/test_review_workflow.py @@ -76,6 +76,37 @@ class ReviewWorkflowTests(unittest.TestCase): self.assertEqual("cat_2", suggestions[0]["catalog_id"]) self.assertEqual("exact upc", suggestions[0]["reason"]) + def test_search_catalog_rows_ranks_token_overlap(self): + results = review_products.search_catalog_rows( + "mixed pepper", + [ + { + "catalog_id": "cat_1", + "catalog_name": "MIXED PEPPER", + "product_type": "pepper", + "category": "produce", + "variant": "", + }, + { + "catalog_id": "cat_2", + "catalog_name": "GROUND PEPPER", + "product_type": "spice", + "category": "baking", + "variant": "", + }, + ], + [ + { + "normalized_item_id": "gnorm_mix", + "catalog_id": "cat_1", + } + ], + "cnorm_mix", + ) + + self.assertEqual("cat_1", results[0]["catalog_id"]) + self.assertGreater(results[0]["score"], results[1]["score"]) + def test_review_products_displays_position_items_and_suggestions(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv" @@ -188,7 +219,7 @@ class ReviewWorkflowTests(unittest.TestCase): self.assertIn("Review guide:", result.output) self.assertIn("Review 1/1: MIXED PEPPER", result.output) self.assertIn("2 matched items:", result.output) - self.assertIn("[#] link to suggestion", result.output) + self.assertIn("[#] link to suggestion [s]earch [n]ew e[x]clude [q]uit >", result.output) first_item = result.output.index("[1] MIXED PEPPER 6-PACK | costco | 2026-03-14 | 7.49 | ") second_item = result.output.index("[2] MIXED PEPPER 6-PACK | costco | 2026-03-12 | 6.99 | https://example.test/mixed-pepper.jpg") self.assertLess(first_item, second_item) @@ -263,7 +294,7 @@ class ReviewWorkflowTests(unittest.TestCase): self.assertEqual(0, result.exit_code) self.assertIn("no catalog_name suggestions found", result.output) - def test_link_existing_uses_numbered_selection_and_confirmation(self): + def test_search_links_catalog_and_writes_link_row(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv" queue_csv = Path(tmpdir) / "review_queue.csv" @@ -370,11 +401,12 @@ class ReviewWorkflowTests(unittest.TestCase): "--limit", "1", ], - input="1\nlinked by test\n", + input="s\nmixed pepper\n1\nlinked by test\n", color=True, ) self.assertEqual(0, result.exit_code) + self.assertIn("1 search results found:", result.output) with resolutions_csv.open(newline="", encoding="utf-8") as handle: rows = list(csv.DictReader(handle)) with links_csv.open(newline="", encoding="utf-8") as handle: @@ -383,6 +415,90 @@ class ReviewWorkflowTests(unittest.TestCase): self.assertEqual("link", rows[0]["resolution_action"]) self.assertEqual("cat_mix", link_rows[0]["catalog_id"]) + def test_search_no_matches_allows_retry_or_return(self): + with tempfile.TemporaryDirectory() as tmpdir: + purchases_csv = Path(tmpdir) / "purchases.csv" + queue_csv = Path(tmpdir) / "review_queue.csv" + resolutions_csv = Path(tmpdir) / "review_resolutions.csv" + catalog_csv = Path(tmpdir) / "catalog.csv" + links_csv = Path(tmpdir) / "product_links.csv" + + with purchases_csv.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter( + handle, + fieldnames=[ + "purchase_date", + "retailer", + "order_id", + "line_no", + "normalized_item_id", + "catalog_id", + "raw_item_name", + "normalized_item_name", + "image_url", + "upc", + "line_total", + ], + ) + writer.writeheader() + writer.writerow( + { + "purchase_date": "2026-03-14", + "retailer": "giant", + "order_id": "g1", + "line_no": "1", + "normalized_item_id": "gnorm_ice", + "catalog_id": "", + "raw_item_name": "SB BAGGED ICE 20LB", + "normalized_item_name": "BAGGED ICE", + "image_url": "", + "upc": "", + "line_total": "3.50", + } + ) + + with catalog_csv.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS) + writer.writeheader() + writer.writerow( + { + "catalog_id": "cat_ice", + "catalog_name": "ICE", + "category": "frozen", + "product_type": "ice", + "brand": "", + "variant": "", + "size_value": "", + "size_unit": "", + "pack_qty": "", + "measure_type": "", + "notes": "", + "created_at": "", + "updated_at": "", + } + ) + + result = CliRunner().invoke( + review_products.main, + [ + "--purchases-csv", + str(purchases_csv), + "--queue-csv", + str(queue_csv), + "--resolutions-csv", + str(resolutions_csv), + "--catalog-csv", + str(catalog_csv), + "--links-csv", + str(links_csv), + ], + input="s\nzzz\nq\nq\n", + color=True, + ) + + self.assertEqual(0, result.exit_code) + self.assertIn("no matches found", result.output) + def test_review_products_creates_catalog_and_resolution(self): with tempfile.TemporaryDirectory() as tmpdir: purchases_csv = Path(tmpdir) / "purchases.csv"