from collections import defaultdict from datetime import date import click import build_purchases from layer_helpers import compact_join, stable_id, write_csv_rows QUEUE_FIELDS = [ "review_id", "retailer", "observed_product_id", "canonical_product_id", "reason_code", "priority", "raw_item_names", "normalized_names", "upc_values", "example_prices", "seen_count", "status", "resolution_action", "resolution_notes", "created_at", "updated_at", ] def build_review_queue(purchase_rows, resolution_rows): by_observed = defaultdict(list) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) for row in purchase_rows: observed_product_id = row.get("observed_product_id", "") if not observed_product_id: continue by_observed[observed_product_id].append(row) today_text = str(date.today()) queue_rows = [] for observed_product_id, rows in sorted(by_observed.items()): current_resolution = resolution_lookup.get(observed_product_id, {}) if current_resolution.get("status") == "approved": continue unresolved_rows = [row for row in rows if not row.get("canonical_product_id")] if not unresolved_rows: continue retailers = sorted({row["retailer"] for row in rows}) review_id = stable_id("rvw", observed_product_id) queue_rows.append( { "review_id": review_id, "retailer": " | ".join(retailers), "observed_product_id": observed_product_id, "canonical_product_id": current_resolution.get("canonical_product_id", ""), "reason_code": "missing_canonical_link", "priority": "high", "raw_item_names": compact_join( sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}), limit=8, ), "normalized_names": compact_join( sorted( { row["normalized_item_name"] for row in rows if row["normalized_item_name"] } ), limit=8, ), "upc_values": compact_join( sorted({row["upc"] for row in rows if row["upc"]}), limit=8, ), "example_prices": compact_join( sorted({row["line_total"] for row in rows if row["line_total"]}), limit=8, ), "seen_count": str(len(rows)), "status": current_resolution.get("status", "pending"), "resolution_action": current_resolution.get("resolution_action", ""), "resolution_notes": current_resolution.get("resolution_notes", ""), "created_at": current_resolution.get("reviewed_at", today_text), "updated_at": today_text, } ) return queue_rows def save_resolution_rows(path, rows): write_csv_rows(path, rows, build_purchases.RESOLUTION_FIELDS) def save_catalog_rows(path, rows): write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS) INFO_COLOR = "cyan" PROMPT_COLOR = "bright_yellow" WARNING_COLOR = "magenta" def sort_related_items(rows): return sorted( rows, key=lambda row: ( row.get("purchase_date", ""), row.get("order_id", ""), int(row.get("line_no", "0") or "0"), ), reverse=True, ) def build_canonical_suggestions(related_rows, catalog_rows, limit=3): normalized_names = { row.get("normalized_item_name", "").strip().upper() for row in related_rows if row.get("normalized_item_name", "").strip() } upcs = { row.get("upc", "").strip() for row in related_rows if row.get("upc", "").strip() } suggestions = [] seen_ids = set() def add_matches(rows, reason): for row in rows: canonical_product_id = row.get("canonical_product_id", "") if not canonical_product_id or canonical_product_id in seen_ids: continue seen_ids.add(canonical_product_id) suggestions.append( { "canonical_product_id": canonical_product_id, "canonical_name": row.get("canonical_name", ""), "reason": reason, } ) if len(suggestions) >= limit: return True return False exact_upc_rows = [ row for row in catalog_rows if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs ] if add_matches(exact_upc_rows, "exact upc"): return suggestions exact_name_rows = [ row for row in catalog_rows if row.get("canonical_name", "").strip().upper() in normalized_names ] if add_matches(exact_name_rows, "exact normalized name"): return suggestions contains_rows = [] for row in catalog_rows: canonical_name = row.get("canonical_name", "").strip().upper() if not canonical_name: continue for normalized_name in normalized_names: if normalized_name in canonical_name or canonical_name in normalized_name: contains_rows.append(row) break add_matches(contains_rows, "canonical name contains match") return suggestions def build_display_lines(queue_row, related_rows): lines = [] for index, row in enumerate(sort_related_items(related_rows), start=1): lines.append( " [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | " "{upc} | {retailer}".format( index=index, purchase_date=row.get("purchase_date", ""), line_total=row.get("line_total", ""), raw_item_name=row.get("raw_item_name", ""), normalized_item_name=row.get("normalized_item_name", ""), upc=row.get("upc", ""), retailer=row.get("retailer", ""), ) ) if row.get("image_url"): lines.append(f" {row['image_url']}") if not lines: lines.append(" [1] no matched item rows found") return lines def observed_name(queue_row, related_rows): if queue_row.get("normalized_names"): return queue_row["normalized_names"].split(" | ")[0] for row in related_rows: if row.get("normalized_item_name"): return row["normalized_item_name"] return queue_row.get("observed_product_id", "") def choose_existing_canonical(display_rows, observed_label, matched_count): click.secho( f"Select the canonical_name to associate {matched_count} items with:", fg=INFO_COLOR, ) for index, row in enumerate(display_rows, start=1): click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}") choice = click.prompt( click.style("selection", fg=PROMPT_COLOR), type=click.IntRange(1, len(display_rows)), ) chosen_row = display_rows[choice - 1] click.echo( f'{matched_count} "{observed_label}" items and future matches will be associated ' f'with "{chosen_row["canonical_name"]}".' ) click.secho( "actions: [y]es [n]o [b]ack [s]kip [q]uit", fg=PROMPT_COLOR, ) confirm = click.prompt( click.style("confirm", fg=PROMPT_COLOR), type=click.Choice(["y", "n", "b", "s", "q"]), ) if confirm == "y": return chosen_row["canonical_product_id"], "" if confirm == "s": return "", "skip" if confirm == "q": return "", "quit" return "", "back" def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total): suggestions = build_canonical_suggestions(related_rows, catalog_rows) observed_label = observed_name(queue_row, related_rows) matched_count = len(related_rows) click.echo("") click.secho( f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} " "to canonical_name [__]?", fg=INFO_COLOR, ) click.echo(f"{matched_count} matched items:") for line in build_display_lines(queue_row, related_rows): click.echo(line) if suggestions: click.echo(f"{len(suggestions)} canonical suggestions found:") for index, suggestion in enumerate(suggestions, start=1): click.echo(f" [{index}] {suggestion['canonical_name']}") else: click.echo("no canonical_name suggestions found") click.secho( "[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", fg=PROMPT_COLOR, ) action = click.prompt( "", type=click.Choice(["l", "n", "x", "s", "q"]), prompt_suffix=" ", ) if action == "q": return None, None if action == "s": return { "observed_product_id": queue_row["observed_product_id"], "canonical_product_id": "", "resolution_action": "skip", "status": "pending", "resolution_notes": queue_row.get("resolution_notes", ""), "reviewed_at": str(date.today()), }, None if action == "x": notes = click.prompt( click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False, ) return { "observed_product_id": queue_row["observed_product_id"], "canonical_product_id": "", "resolution_action": "exclude", "status": "approved", "resolution_notes": notes, "reviewed_at": str(date.today()), }, None if action == "l": display_rows = suggestions or [ { "canonical_product_id": row["canonical_product_id"], "canonical_name": row["canonical_name"], "reason": "catalog sample", } for row in catalog_rows[:10] ] while True: canonical_product_id, outcome = choose_existing_canonical( display_rows, observed_label, matched_count, ) if outcome == "skip": return { "observed_product_id": queue_row["observed_product_id"], "canonical_product_id": "", "resolution_action": "skip", "status": "pending", "resolution_notes": queue_row.get("resolution_notes", ""), "reviewed_at": str(date.today()), }, None if outcome == "quit": return None, None if outcome == "back": continue break notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False) return { "observed_product_id": queue_row["observed_product_id"], "canonical_product_id": canonical_product_id, "resolution_action": "link", "status": "approved", "resolution_notes": notes, "reviewed_at": str(date.today()), }, None canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str) category = click.prompt( click.style("category", fg=PROMPT_COLOR), default="", show_default=False, ) product_type = click.prompt( click.style("product type", fg=PROMPT_COLOR), default="", show_default=False, ) notes = click.prompt( click.style("notes", fg=PROMPT_COLOR), default="", show_default=False, ) canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}") canonical_row = { "canonical_product_id": canonical_product_id, "canonical_name": canonical_name, "category": category, "product_type": product_type, "brand": "", "variant": "", "size_value": "", "size_unit": "", "pack_qty": "", "measure_type": "", "notes": notes, "created_at": str(date.today()), "updated_at": str(date.today()), } resolution_row = { "observed_product_id": queue_row["observed_product_id"], "canonical_product_id": canonical_product_id, "resolution_action": "create", "status": "approved", "resolution_notes": notes, "reviewed_at": str(date.today()), } return resolution_row, canonical_row @click.command() @click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--queue-csv", default="combined_output/review_queue.csv", show_default=True) @click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True) @click.option("--limit", default=0, show_default=True, type=int) @click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.") def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only): purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv) resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv) catalog_rows = build_purchases.read_optional_csv_rows(catalog_csv) queue_rows = build_review_queue(purchase_rows, resolution_rows) write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS) click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}") if refresh_only: return resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")} rows_by_observed = defaultdict(list) for row in purchase_rows: observed_product_id = row.get("observed_product_id", "") if observed_product_id: rows_by_observed[observed_product_id].append(row) reviewed = 0 for index, queue_row in enumerate(queue_rows, start=1): if limit and reviewed >= limit: break related_rows = rows_by_observed.get(queue_row["observed_product_id"], []) result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows)) if result == (None, None): break resolution_row, canonical_row = result resolution_lookup[resolution_row["observed_product_id"]] = resolution_row if canonical_row and canonical_row["canonical_product_id"] not in catalog_by_id: catalog_by_id[canonical_row["canonical_product_id"]] = canonical_row catalog_rows.append(canonical_row) reviewed += 1 save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["observed_product_id"])) save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["canonical_product_id"])) click.echo( f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} " f"and {len(catalog_by_id)} catalog rows to {catalog_csv}" ) if __name__ == "__main__": main()