Add catalog search to review flow

This commit is contained in:
ben
2026-03-20 13:32:20 -04:00
parent 17158fb9e9
commit f93b9aa464
2 changed files with 225 additions and 63 deletions

View File

@@ -1,5 +1,6 @@
from collections import defaultdict
from datetime import date
import re
import click
@@ -29,6 +30,7 @@ QUEUE_FIELDS = [
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
TOKEN_RE = re.compile(r"[A-Z0-9]+")
def print_intro_text():
@@ -134,6 +136,13 @@ def sort_related_items(rows):
)
def tokenize_match_text(*values):
tokens = set()
for value in values:
tokens.update(TOKEN_RE.findall((value or "").upper()))
return tokens
def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3):
normalized_names = {
row.get("normalized_item_name", "").strip().upper()
@@ -190,6 +199,55 @@ def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3
return suggestions
def search_catalog_rows(query, catalog_rows, purchase_rows, current_normalized_item_id, limit=10):
query_tokens = tokenize_match_text(query)
if not query_tokens:
return []
linked_purchase_counts = defaultdict(int)
linked_normalized_ids = defaultdict(set)
current_catalog_id = ""
for row in purchase_rows:
catalog_id = row.get("catalog_id", "")
normalized_item_id = row.get("normalized_item_id", "")
if catalog_id and normalized_item_id:
linked_purchase_counts[catalog_id] += 1
linked_normalized_ids[catalog_id].add(normalized_item_id)
if normalized_item_id == current_normalized_item_id and catalog_id:
current_catalog_id = catalog_id
ranked_rows = []
for row in catalog_rows:
catalog_id = row.get("catalog_id", "")
if not catalog_id or catalog_id == current_catalog_id:
continue
catalog_tokens = tokenize_match_text(
row.get("catalog_name", ""),
row.get("product_type", ""),
row.get("variant", ""),
)
overlap = query_tokens & catalog_tokens
if not overlap:
continue
ranked_rows.append(
{
"catalog_id": catalog_id,
"catalog_name": row.get("catalog_name", ""),
"product_type": row.get("product_type", ""),
"category": row.get("category", ""),
"variant": row.get("variant", ""),
"linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())),
"linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0),
"score": len(overlap),
}
)
ranked_rows.sort(
key=lambda row: (-row["score"], row["catalog_name"], row["catalog_id"])
)
return ranked_rows[:limit]
def suggestion_display_rows(suggestions, purchase_rows, catalog_rows):
linked_purchase_counts = defaultdict(int)
linked_normalized_ids = defaultdict(set)
@@ -235,6 +293,15 @@ def suggestion_display_rows(suggestions, purchase_rows, catalog_rows):
return display_rows
def print_catalog_rows(rows):
for index, row in enumerate(rows, start=1):
click.echo(
f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, "
f"{row.get('category', '')} ({row['linked_normalized_items']} items, "
f"{row['linked_purchase_rows']} rows)"
)
def build_display_lines(related_rows):
lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1):
@@ -267,12 +334,7 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count):
f"Select the catalog_name to associate {matched_count} items with:",
fg=INFO_COLOR,
)
for index, row in enumerate(display_rows, start=1):
click.echo(
f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, "
f"{row.get('category', '')} ({row['linked_normalized_items']} items, "
f"{row['linked_purchase_rows']} rows)"
)
print_catalog_rows(display_rows)
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)),
@@ -314,18 +376,13 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
click.echo(line)
if suggestions:
click.echo(f"{len(suggestions)} catalog_name suggestions found:")
for index, suggestion in enumerate(suggestions, start=1):
click.echo(
f" [{index}] {suggestion['catalog_name']}, {suggestion.get('product_type', '')}, "
f"{suggestion.get('category', '')} ({suggestion['linked_normalized_items']} items, "
f"{suggestion['linked_purchase_rows']} rows)"
)
print_catalog_rows(suggestions)
else:
click.echo("no catalog_name suggestions found")
prompt_bits = []
if suggestions:
prompt_bits.append("[#] link to suggestion")
prompt_bits.extend(["[l]ink existing", "[n]ew", "[s]kip", "e[x]clude", "[q]uit"])
prompt_bits.extend(["[s]earch", "[n]ew", "e[x]clude", "[q]uit"])
click.secho(" ".join(prompt_bits) + " >", fg=PROMPT_COLOR)
action = click.prompt("", type=str, prompt_suffix=" ").strip().lower()
if action.isdigit() and suggestions:
@@ -346,14 +403,42 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
if action == "q":
return None, None
if action == "s":
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
while True:
query = click.prompt(click.style("search", fg=PROMPT_COLOR), default="", show_default=False).strip()
if not query:
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
search_rows = search_catalog_rows(
query,
catalog_rows,
purchase_rows,
queue_row["normalized_item_id"],
)
if not search_rows:
click.echo("no matches found")
retry = click.prompt(
click.style("search again? [enter=yes, q=no]", fg=PROMPT_COLOR),
default="",
show_default=False,
).strip().lower()
if retry == "q":
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
continue
click.echo(f"{len(search_rows)} search results found:")
print_catalog_rows(search_rows)
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(search_rows)),
)
chosen_row = search_rows[choice - 1]
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": chosen_row["catalog_id"],
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "x":
notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
@@ -364,45 +449,6 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "l":
display_rows = suggestions or [
{
"catalog_id": row["catalog_id"],
"catalog_name": row["catalog_name"],
"reason": "catalog sample",
"product_type": row.get("product_type", ""),
"category": row.get("category", ""),
"linked_normalized_items": 0,
"linked_purchase_rows": 0,
}
for row in catalog_rows[:10]
if row.get("catalog_id")
]
while True:
catalog_id, outcome = choose_existing_catalog(display_rows, normalized_name, matched_count)
if outcome == "skip":
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if outcome == "quit":
return None, None
if outcome == "back":
continue
break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": catalog_id,
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action != "n":
click.secho("invalid action", fg=WARNING_COLOR)
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)

View File

@@ -76,6 +76,37 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual("cat_2", suggestions[0]["catalog_id"])
self.assertEqual("exact upc", suggestions[0]["reason"])
def test_search_catalog_rows_ranks_token_overlap(self):
results = review_products.search_catalog_rows(
"mixed pepper",
[
{
"catalog_id": "cat_1",
"catalog_name": "MIXED PEPPER",
"product_type": "pepper",
"category": "produce",
"variant": "",
},
{
"catalog_id": "cat_2",
"catalog_name": "GROUND PEPPER",
"product_type": "spice",
"category": "baking",
"variant": "",
},
],
[
{
"normalized_item_id": "gnorm_mix",
"catalog_id": "cat_1",
}
],
"cnorm_mix",
)
self.assertEqual("cat_1", results[0]["catalog_id"])
self.assertGreater(results[0]["score"], results[1]["score"])
def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
@@ -188,7 +219,7 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertIn("Review guide:", result.output)
self.assertIn("Review 1/1: MIXED PEPPER", result.output)
self.assertIn("2 matched items:", result.output)
self.assertIn("[#] link to suggestion", result.output)
self.assertIn("[#] link to suggestion [s]earch [n]ew e[x]clude [q]uit >", result.output)
first_item = result.output.index("[1] MIXED PEPPER 6-PACK | costco | 2026-03-14 | 7.49 | ")
second_item = result.output.index("[2] MIXED PEPPER 6-PACK | costco | 2026-03-12 | 6.99 | https://example.test/mixed-pepper.jpg")
self.assertLess(first_item, second_item)
@@ -263,7 +294,7 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual(0, result.exit_code)
self.assertIn("no catalog_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self):
def test_search_links_catalog_and_writes_link_row(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
@@ -370,11 +401,12 @@ class ReviewWorkflowTests(unittest.TestCase):
"--limit",
"1",
],
input="1\nlinked by test\n",
input="s\nmixed pepper\n1\nlinked by test\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("1 search results found:", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
with links_csv.open(newline="", encoding="utf-8") as handle:
@@ -383,6 +415,90 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual("link", rows[0]["resolution_action"])
self.assertEqual("cat_mix", link_rows[0]["catalog_id"])
def test_search_no_matches_allows_retry_or_return(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"normalized_item_id",
"catalog_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_item_id": "gnorm_ice",
"catalog_id": "",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"catalog_id": "cat_ice",
"catalog_name": "ICE",
"category": "frozen",
"product_type": "ice",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
],
input="s\nzzz\nq\nq\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("no matches found", result.output)
def test_review_products_creates_catalog_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"