diff --git a/pm/tasks.org b/pm/tasks.org index 6b32bee..0127cf2 100644 --- a/pm/tasks.org +++ b/pm/tasks.org @@ -677,7 +677,7 @@ replace the old observed/canonical workflow with a review-first pipeline that us - Existing auto-generated catalog rows are no longer carried forward by default; only deliberate catalog entries survive. That keeps the new `catalog.csv` conservative, but it also means prior observed-based auto-links do not migrate into the new model. - Live rerun after the refactor produced `627` purchase rows, `387` review-queue rows, `407` distinct normalized items, `0` linked normalized items, and `0` unresolved rows missing from the review queue. -* [ ] t1.16: cleanup review process and format +* [x] t1.16: cleanup review process and format ** acceptance criteria 1. Add intro text explaining: @@ -709,11 +709,14 @@ replace the old observed/canonical workflow with a review-first pipeline that us ** evidence -- commit: -- tests: -- date: +- commit: pending +- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python review_products.py --refresh-only`; `./venv/bin/python review_products.py --help` +- datetime: 2026-03-20 12:45:25 EDT ** notes +- The main flow change is operational rather than architectural: each review decision now persists immediately to `review_resolutions.csv`, `catalog.csv`, `product_links.csv`, and the on-disk `review_queue.csv`. +- Direct numeric selection works well for suggestion-heavy review, while `[l]ink existing` remains available as a fallback when the suggestion list is empty or incomplete. +- I kept the review data model unchanged from `t1.15`; this task only tightened the prompt format, field order, and save behavior. #+END_* diff --git a/review_products.py b/review_products.py index f84f213..4b3e2a0 100644 --- a/review_products.py +++ b/review_products.py @@ -31,6 +31,13 @@ PROMPT_COLOR = "bright_yellow" WARNING_COLOR = "magenta" +def print_intro_text(): + click.secho("Review guide:", fg=INFO_COLOR) + click.echo(" catalog name: unique product identity including variant, but not packaging") + click.echo(" product type: general product you want to compare across purchases") + click.echo(" category: broad analysis bucket such as dairy, produce, or frozen") + + def build_review_queue(purchase_rows, resolution_rows): by_normalized = defaultdict(list) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) @@ -111,6 +118,10 @@ def save_catalog_rows(path, rows): write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS) +def save_link_rows(path, rows): + write_csv_rows(path, rows, build_purchases.PRODUCT_LINK_FIELDS) + + def sort_related_items(rows): return sorted( rows, @@ -179,23 +190,64 @@ def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3 return suggestions +def suggestion_display_rows(suggestions, purchase_rows, catalog_rows): + linked_purchase_counts = defaultdict(int) + linked_normalized_ids = defaultdict(set) + for row in purchase_rows: + catalog_id = row.get("catalog_id", "") + normalized_item_id = row.get("normalized_item_id", "") + if not catalog_id or not normalized_item_id: + continue + linked_purchase_counts[catalog_id] += 1 + linked_normalized_ids[catalog_id].add(normalized_item_id) + + display_rows = [] + catalog_details = { + row["catalog_id"]: { + "product_type": row.get("product_type", ""), + "category": row.get("category", ""), + } + for row in catalog_rows + if row.get("catalog_id") + } + for row in purchase_rows: + if row.get("catalog_id"): + catalog_details.setdefault( + row["catalog_id"], + { + "product_type": row.get("product_type", ""), + "category": row.get("category", ""), + }, + ) + + for row in suggestions: + catalog_id = row["catalog_id"] + details = catalog_details.get(catalog_id, {}) + display_rows.append( + { + **row, + "product_type": details.get("product_type", ""), + "category": details.get("category", ""), + "linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0), + "linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())), + } + ) + return display_rows + + def build_display_lines(related_rows): lines = [] for index, row in enumerate(sort_related_items(related_rows), start=1): lines.append( - " [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | " - "{upc} | {retailer}".format( + " [{index}] {raw_item_name} | {retailer} | {purchase_date} | {line_total} | {image_url}".format( index=index, + raw_item_name=row.get("raw_item_name", ""), + retailer=row.get("retailer", ""), purchase_date=row.get("purchase_date", ""), line_total=row.get("line_total", ""), - raw_item_name=row.get("raw_item_name", ""), - normalized_item_name=row.get("normalized_item_name", ""), - upc=row.get("upc", ""), - retailer=row.get("retailer", ""), + image_url=row.get("image_url", ""), ) ) - if row.get("image_url"): - lines.append(f" {row['image_url']}") if not lines: lines.append(" [1] no matched item rows found") return lines @@ -216,7 +268,11 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count): fg=INFO_COLOR, ) for index, row in enumerate(display_rows, start=1): - click.echo(f" [{index}] {row['catalog_name']} | {row['catalog_id']}") + click.echo( + f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, " + f"{row.get('category', '')} ({row['linked_normalized_items']} items, " + f"{row['linked_purchase_rows']} rows)" + ) choice = click.prompt( click.style("selection", fg=PROMPT_COLOR), type=click.IntRange(1, len(display_rows)), @@ -241,13 +297,16 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count): def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total): - suggestions = build_catalog_suggestions(related_rows, purchase_rows, catalog_rows) + suggestions = suggestion_display_rows( + build_catalog_suggestions(related_rows, purchase_rows, catalog_rows), + purchase_rows, + catalog_rows, + ) normalized_name = normalized_label(queue_row, related_rows) matched_count = len(related_rows) click.echo("") click.secho( - f"Review {queue_index}/{queue_total}: Resolve normalized_item {normalized_name} " - "to catalog_name [__]?", + f"Review {queue_index}/{queue_total}: {normalized_name}", fg=INFO_COLOR, ) click.echo(f"{matched_count} matched items:") @@ -256,11 +315,34 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu if suggestions: click.echo(f"{len(suggestions)} catalog_name suggestions found:") for index, suggestion in enumerate(suggestions, start=1): - click.echo(f" [{index}] {suggestion['catalog_name']}") + click.echo( + f" [{index}] {suggestion['catalog_name']}, {suggestion.get('product_type', '')}, " + f"{suggestion.get('category', '')} ({suggestion['linked_normalized_items']} items, " + f"{suggestion['linked_purchase_rows']} rows)" + ) else: click.echo("no catalog_name suggestions found") - click.secho("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", fg=PROMPT_COLOR) - action = click.prompt("", type=click.Choice(["l", "n", "x", "s", "q"]), prompt_suffix=" ") + prompt_bits = [] + if suggestions: + prompt_bits.append("[#] link to suggestion") + prompt_bits.extend(["[l]ink existing", "[n]ew", "[s]kip", "e[x]clude", "[q]uit"]) + click.secho(" ".join(prompt_bits) + " >", fg=PROMPT_COLOR) + action = click.prompt("", type=str, prompt_suffix=" ").strip().lower() + if action.isdigit() and suggestions: + choice = int(action) + if 1 <= choice <= len(suggestions): + chosen_row = suggestions[choice - 1] + notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) + return { + "normalized_item_id": queue_row["normalized_item_id"], + "catalog_id": chosen_row["catalog_id"], + "resolution_action": "link", + "status": "approved", + "resolution_notes": notes, + "reviewed_at": str(date.today()), + }, None + click.secho("invalid suggestion number", fg=WARNING_COLOR) + return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total) if action == "q": return None, None if action == "s": @@ -288,6 +370,10 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu "catalog_id": row["catalog_id"], "catalog_name": row["catalog_name"], "reason": "catalog sample", + "product_type": row.get("product_type", ""), + "category": row.get("category", ""), + "linked_normalized_items": 0, + "linked_purchase_rows": 0, } for row in catalog_rows[:10] if row.get("catalog_id") @@ -317,10 +403,13 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu "resolution_notes": notes, "reviewed_at": str(date.today()), }, None + if action != "n": + click.secho("invalid action", fg=WARNING_COLOR) + return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total) catalog_name = click.prompt(click.style("catalog name", fg=PROMPT_COLOR), type=str) - category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False) product_type = click.prompt(click.style("product type", fg=PROMPT_COLOR), default="", show_default=False) + category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False) notes = click.prompt(click.style("notes", fg=PROMPT_COLOR), default="", show_default=False) catalog_id = stable_id("cat", f"manual|{catalog_name}|{category}|{product_type}") catalog_row = { @@ -349,17 +438,41 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu return resolution_row, catalog_row +def apply_resolution_to_queue(queue_rows, resolution_lookup): + today_text = str(date.today()) + updated_rows = [] + for row in queue_rows: + resolution = resolution_lookup.get(row["normalized_item_id"], {}) + row_copy = dict(row) + if resolution: + row_copy["catalog_id"] = resolution.get("catalog_id", "") + row_copy["status"] = resolution.get("status", row_copy.get("status", "pending")) + row_copy["resolution_action"] = resolution.get("resolution_action", "") + row_copy["resolution_notes"] = resolution.get("resolution_notes", "") + row_copy["updated_at"] = resolution.get("reviewed_at", today_text) + if resolution.get("status") == "approved": + row_copy["created_at"] = row_copy.get("created_at") or resolution.get("reviewed_at", today_text) + updated_rows.append(row_copy) + return updated_rows + + +def link_rows_from_state(link_lookup): + return sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"]) + + @click.command() @click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True) @click.option("--queue-csv", default="data/review/review_queue.csv", show_default=True) @click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True) @click.option("--catalog-csv", default="data/catalog.csv", show_default=True) +@click.option("--links-csv", default="data/review/product_links.csv", show_default=True) @click.option("--limit", default=0, show_default=True, type=int) @click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.") -def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only): +def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, links_csv, limit, refresh_only): purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv) resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv) catalog_rows = build_purchases.merge_catalog_rows(build_purchases.read_optional_csv_rows(catalog_csv), []) + link_lookup = build_purchases.load_link_lookup(build_purchases.read_optional_csv_rows(links_csv)) queue_rows = build_review_queue(purchase_rows, resolution_rows) write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS) click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}") @@ -367,6 +480,7 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_ if refresh_only: return + print_intro_text() resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) catalog_by_id = {row["catalog_id"]: row for row in catalog_rows if row.get("catalog_id")} rows_by_normalized = defaultdict(list) @@ -388,16 +502,38 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_ if catalog_row and catalog_row["catalog_id"] not in catalog_by_id: catalog_by_id[catalog_row["catalog_id"]] = catalog_row catalog_rows.append(catalog_row) + normalized_item_id = resolution_row["normalized_item_id"] + if resolution_row["status"] == "approved": + if resolution_row["resolution_action"] in {"link", "create"} and resolution_row.get("catalog_id"): + link_lookup[normalized_item_id] = { + "normalized_item_id": normalized_item_id, + "catalog_id": resolution_row["catalog_id"], + "link_method": f"manual_{resolution_row['resolution_action']}", + "link_confidence": "high", + "review_status": "approved", + "reviewed_by": "", + "reviewed_at": resolution_row.get("reviewed_at", ""), + "link_notes": resolution_row.get("resolution_notes", ""), + } + elif resolution_row["resolution_action"] == "exclude": + link_lookup.pop(normalized_item_id, None) + queue_rows = apply_resolution_to_queue(queue_rows, resolution_lookup) + write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS) + save_resolution_rows( + resolutions_csv, + sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]), + ) + save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"])) + save_link_rows(links_csv, link_rows_from_state(link_lookup)) reviewed += 1 - save_resolution_rows( - resolutions_csv, - sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]), - ) + save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"])) save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"])) + save_link_rows(links_csv, link_rows_from_state(link_lookup)) click.echo( - f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} " - f"and {len(catalog_by_id)} catalog rows to {catalog_csv}" + f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv}, " + f"{len(catalog_by_id)} catalog rows to {catalog_csv}, " + f"and {len(link_lookup)} product links to {links_csv}" ) diff --git a/tests/test_review_workflow.py b/tests/test_review_workflow.py index 6a597f0..4aa93b1 100644 --- a/tests/test_review_workflow.py +++ b/tests/test_review_workflow.py @@ -82,6 +82,7 @@ class ReviewWorkflowTests(unittest.TestCase): queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" catalog_csv = Path(tmpdir) / "catalog.csv" + links_csv = Path(tmpdir) / "product_links.csv" purchase_fields = [ "purchase_date", @@ -176,21 +177,23 @@ class ReviewWorkflowTests(unittest.TestCase): str(resolutions_csv), "--catalog-csv", str(catalog_csv), + "--links-csv", + str(links_csv), ], input="q\n", color=True, ) self.assertEqual(0, result.exit_code) - self.assertIn("Review 1/1: Resolve normalized_item MIXED PEPPER to catalog_name [__]?", result.output) + self.assertIn("Review guide:", result.output) + self.assertIn("Review 1/1: MIXED PEPPER", result.output) self.assertIn("2 matched items:", result.output) - self.assertIn("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", result.output) - first_item = result.output.index("[1] 2026-03-14 | 7.49") - second_item = result.output.index("[2] 2026-03-12 | 6.99") + self.assertIn("[#] link to suggestion", result.output) + first_item = result.output.index("[1] MIXED PEPPER 6-PACK | costco | 2026-03-14 | 7.49 | ") + second_item = result.output.index("[2] MIXED PEPPER 6-PACK | costco | 2026-03-12 | 6.99 | https://example.test/mixed-pepper.jpg") self.assertLess(first_item, second_item) - self.assertIn("https://example.test/mixed-pepper.jpg", result.output) self.assertIn("1 catalog_name suggestions found:", result.output) - self.assertIn("[1] MIXED PEPPER", result.output) + self.assertIn("[1] MIXED PEPPER, pepper, produce (1 items, 1 rows)", result.output) self.assertIn("\x1b[", result.output) def test_review_products_no_suggestions_is_informational(self): @@ -199,6 +202,7 @@ class ReviewWorkflowTests(unittest.TestCase): queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" catalog_csv = Path(tmpdir) / "catalog.csv" + links_csv = Path(tmpdir) / "product_links.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( @@ -249,6 +253,8 @@ class ReviewWorkflowTests(unittest.TestCase): str(resolutions_csv), "--catalog-csv", str(catalog_csv), + "--links-csv", + str(links_csv), ], input="q\n", color=True, @@ -263,6 +269,7 @@ class ReviewWorkflowTests(unittest.TestCase): queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" catalog_csv = Path(tmpdir) / "catalog.csv" + links_csv = Path(tmpdir) / "product_links.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( @@ -358,22 +365,23 @@ class ReviewWorkflowTests(unittest.TestCase): str(resolutions_csv), "--catalog-csv", str(catalog_csv), + "--links-csv", + str(links_csv), "--limit", "1", ], - input="l\n1\ny\nlinked by test\n", + input="1\nlinked by test\n", color=True, ) self.assertEqual(0, result.exit_code) - self.assertIn("Select the catalog_name to associate 2 items with:", result.output) - self.assertIn("[1] MIXED PEPPER | cat_mix", result.output) - self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output) - self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output) with resolutions_csv.open(newline="", encoding="utf-8") as handle: rows = list(csv.DictReader(handle)) + with links_csv.open(newline="", encoding="utf-8") as handle: + link_rows = list(csv.DictReader(handle)) self.assertEqual("cat_mix", rows[0]["catalog_id"]) self.assertEqual("link", rows[0]["resolution_action"]) + self.assertEqual("cat_mix", link_rows[0]["catalog_id"]) def test_review_products_creates_catalog_and_resolution(self): with tempfile.TemporaryDirectory() as tmpdir: @@ -381,6 +389,7 @@ class ReviewWorkflowTests(unittest.TestCase): queue_csv = Path(tmpdir) / "review_queue.csv" resolutions_csv = Path(tmpdir) / "review_resolutions.csv" catalog_csv = Path(tmpdir) / "catalog.csv" + links_csv = Path(tmpdir) / "product_links.csv" with purchases_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( @@ -426,6 +435,7 @@ class ReviewWorkflowTests(unittest.TestCase): queue_csv=str(queue_csv), resolutions_csv=str(resolutions_csv), catalog_csv=str(catalog_csv), + links_csv=str(links_csv), limit=1, refresh_only=False, ) @@ -433,13 +443,21 @@ class ReviewWorkflowTests(unittest.TestCase): self.assertTrue(queue_csv.exists()) self.assertTrue(resolutions_csv.exists()) self.assertTrue(catalog_csv.exists()) + self.assertTrue(links_csv.exists()) + with queue_csv.open(newline="", encoding="utf-8") as handle: + queue_rows = list(csv.DictReader(handle)) with resolutions_csv.open(newline="", encoding="utf-8") as handle: resolution_rows = list(csv.DictReader(handle)) with catalog_csv.open(newline="", encoding="utf-8") as handle: catalog_rows = list(csv.DictReader(handle)) + with links_csv.open(newline="", encoding="utf-8") as handle: + link_rows = list(csv.DictReader(handle)) + self.assertEqual("approved", queue_rows[0]["status"]) + self.assertEqual("create", queue_rows[0]["resolution_action"]) self.assertEqual("create", resolution_rows[0]["resolution_action"]) self.assertEqual("approved", resolution_rows[0]["status"]) self.assertEqual("ICE", catalog_rows[0]["catalog_name"]) + self.assertEqual(catalog_rows[0]["catalog_id"], link_rows[0]["catalog_id"]) if __name__ == "__main__":