Compare commits

...

6 Commits

Author SHA1 Message Date
ben
afadd0c0d0 Restore skip and move search to find 2026-03-20 13:35:07 -04:00
ben
2847d2d59f Record t1.16.1 task evidence 2026-03-20 13:32:27 -04:00
ben
f93b9aa464 Add catalog search to review flow 2026-03-20 13:32:20 -04:00
ben
17158fb9e9 Record t1.16 task evidence 2026-03-20 12:45:57 -04:00
ben
975d44bebb Tighten review prompt flow 2026-03-20 12:45:38 -04:00
ben
f478795b5d added t1.16 to cleanup review process 2026-03-20 12:42:23 -04:00
3 changed files with 557 additions and 73 deletions

View File

@@ -624,7 +624,7 @@ tighten Costco-specific normalization so normalized item names are cleaner and d
- The structured parsing still owns size/pack extraction, so name cleanup can safely strip dual-unit and logistics fragments after those fields are parsed.
- Discount-line behavior remains unchanged; this task only cleaned normalized names and preserved the existing audit trail.
* [x] t1.15: refactor review/combine pipeline around normalized_item_id and catalog links (4-8 commits)
* [X] t1.15: refactor review/combine pipeline around normalized_item_id and catalog links (4-8 commits)
replace the old observed/canonical workflow with a review-first pipeline that uses normalized_item_id as the retailer-level review unit and links it to catalog items
** Acceptance Criteria
@@ -677,6 +677,93 @@ replace the old observed/canonical workflow with a review-first pipeline that us
- Existing auto-generated catalog rows are no longer carried forward by default; only deliberate catalog entries survive. That keeps the new `catalog.csv` conservative, but it also means prior observed-based auto-links do not migrate into the new model.
- Live rerun after the refactor produced `627` purchase rows, `387` review-queue rows, `407` distinct normalized items, `0` linked normalized items, and `0` unresolved rows missing from the review queue.
* [X] t1.16: cleanup review process and format
** acceptance criteria
1. Add intro text explaining:
1. catalog name: unique product including variant but not packaging, eg "whole milk", "sharp cheddar cheese"
2. product type: general product you would like to compare to, eg "milk", "cheese"
3. category: eg "dairy"
2. Reformat input per item
1. Change matched item field display order
2. Add count of distinct normalized_item_ids and total purchase rows already linked to the catalog item
3. Add option to select catalog suggestion directly
#+begin_comment
Review 7/22: MIXED PEPPER 6-PK
2 matched items:
- MIXED PEPPER 6-PK | costco | 2026-03-12 | 7.49 | [img_url]
- [raw_name] | [retailer] | [YYYY-mm-dd] | [price] | [img_url]
2 catalog suggestions found:
[1] bell pepper, pepper, produce (42 items)
[2] ground pepper, spice, baking (1 item)
[#] link to suggestion [n]ew [s]kip e[x]clude [q]uit >
#+end_comment
3. When creating new, ask for input in catalog_name, product_type, category order
1. enter to accept blank value
4. Each reviewed item is saved after user input, not at the end of the script.
1. on new creation, create entry in catalog.csv and create entry in product_links.csv
2. on link existing, create entry in product_links.csv
3. update review_queue.csv status for item immediately after action
5. linking operates at normalized_item_id level, not per normalized_row_id
6. ensure catalog.csv and product_links.csv are human-editable and consistent so manual correction is possible without tooling
** evidence
- commit: `975d44b`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python review_products.py --refresh-only`; `./venv/bin/python review_products.py --help`
- datetime: 2026-03-20 12:45:25 EDT
** notes
- The main flow change is operational rather than architectural: each review decision now persists immediately to `review_resolutions.csv`, `catalog.csv`, `product_links.csv`, and the on-disk `review_queue.csv`.
- Direct numeric selection works well for suggestion-heavy review, while `[l]ink existing` remains available as a fallback when the suggestion list is empty or incomplete.
- I kept the review data model unchanged from `t1.15`; this task only tightened the prompt format, field order, and save behavior.
* [X] t1.16.1: add catalog search flow to review ui (2-3 commits)
enable fast lookup of catalog items during review via tokenized search and replace manual list scanning
** acceptance criteria
1. replace `[l]ink existing` with `[f]ind` in review prompt:
- `[#] link to suggestion [f]ind [n]ew [s]kip [x]exclude [q]uit >`
2. implement search flow:
- on `s`, prompt: `search: `
- tokenize input using same normalization rules as suggestion matching
- return ranked list of catalog items where tokens overlap with:
- catalog_name
- product_type
- variant
- display results in same numbered format as suggestions:
[1] flour, flour, baking (12 items, 48 rows)
3. allow direct selection from search results:
- when user inputs number, immediately creates approved resolution and product_links rows
- returns to next review item
4. reuse match logic used for suggestion matching; no new matching system introduced
- future improvements to matching logic will therefore apply in both places
5. search results exclude already-linked current normalized_item_id target
6. fallback behavior:
- if no results, print `no matches found`
- allow retry or return to main prompt
7. keep interaction tight:
- no full catalog dump
- max ~10 results returned
- sorted by simple score (token overlap count)
8. persistence:
- selected link writes immediately to `product_links.csv`
- no buffering until script end
- pm note: optimize for speed over correctness; this is a manual assist tool, not a ranking system
- pm note: improve manual lookup flow only, don't retool or create a second algorithm
** evidence
- commit: `f93b9aa`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python review_products.py --help`; `./venv/bin/python review_products.py --refresh-only`
- datetime: 2026-03-20 13:34:57 EDT
** notes
- The search path reuses the same lightweight token matching rules as suggestion ranking, so there is still only one matching system to maintain.
- Direct numeric suggestion-pick remains the fastest happy path; search is the fallback when suggestions are sparse or missing.
- Search intentionally optimizes for manual speed rather than smart ranking: simple token overlap, max 10 rows, and immediate persistence on selection.
- Follow-up fix: search moved to `[f]ind` so `[s]kip` remains available at the main prompt.
* [ ] 1t.10: add optional llm-assisted suggestion workflow for unresolved normalized retailer items (2-4 commits)
** acceptance criteria

View File

@@ -1,5 +1,6 @@
from collections import defaultdict
from datetime import date
import re
import click
@@ -29,6 +30,14 @@ QUEUE_FIELDS = [
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
TOKEN_RE = re.compile(r"[A-Z0-9]+")
def print_intro_text():
click.secho("Review guide:", fg=INFO_COLOR)
click.echo(" catalog name: unique product identity including variant, but not packaging")
click.echo(" product type: general product you want to compare across purchases")
click.echo(" category: broad analysis bucket such as dairy, produce, or frozen")
def build_review_queue(purchase_rows, resolution_rows):
@@ -111,6 +120,10 @@ def save_catalog_rows(path, rows):
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
def save_link_rows(path, rows):
write_csv_rows(path, rows, build_purchases.PRODUCT_LINK_FIELDS)
def sort_related_items(rows):
return sorted(
rows,
@@ -123,6 +136,13 @@ def sort_related_items(rows):
)
def tokenize_match_text(*values):
tokens = set()
for value in values:
tokens.update(TOKEN_RE.findall((value or "").upper()))
return tokens
def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3):
normalized_names = {
row.get("normalized_item_name", "").strip().upper()
@@ -179,23 +199,122 @@ def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3
return suggestions
def search_catalog_rows(query, catalog_rows, purchase_rows, current_normalized_item_id, limit=10):
query_tokens = tokenize_match_text(query)
if not query_tokens:
return []
linked_purchase_counts = defaultdict(int)
linked_normalized_ids = defaultdict(set)
current_catalog_id = ""
for row in purchase_rows:
catalog_id = row.get("catalog_id", "")
normalized_item_id = row.get("normalized_item_id", "")
if catalog_id and normalized_item_id:
linked_purchase_counts[catalog_id] += 1
linked_normalized_ids[catalog_id].add(normalized_item_id)
if normalized_item_id == current_normalized_item_id and catalog_id:
current_catalog_id = catalog_id
ranked_rows = []
for row in catalog_rows:
catalog_id = row.get("catalog_id", "")
if not catalog_id or catalog_id == current_catalog_id:
continue
catalog_tokens = tokenize_match_text(
row.get("catalog_name", ""),
row.get("product_type", ""),
row.get("variant", ""),
)
overlap = query_tokens & catalog_tokens
if not overlap:
continue
ranked_rows.append(
{
"catalog_id": catalog_id,
"catalog_name": row.get("catalog_name", ""),
"product_type": row.get("product_type", ""),
"category": row.get("category", ""),
"variant": row.get("variant", ""),
"linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())),
"linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0),
"score": len(overlap),
}
)
ranked_rows.sort(
key=lambda row: (-row["score"], row["catalog_name"], row["catalog_id"])
)
return ranked_rows[:limit]
def suggestion_display_rows(suggestions, purchase_rows, catalog_rows):
linked_purchase_counts = defaultdict(int)
linked_normalized_ids = defaultdict(set)
for row in purchase_rows:
catalog_id = row.get("catalog_id", "")
normalized_item_id = row.get("normalized_item_id", "")
if not catalog_id or not normalized_item_id:
continue
linked_purchase_counts[catalog_id] += 1
linked_normalized_ids[catalog_id].add(normalized_item_id)
display_rows = []
catalog_details = {
row["catalog_id"]: {
"product_type": row.get("product_type", ""),
"category": row.get("category", ""),
}
for row in catalog_rows
if row.get("catalog_id")
}
for row in purchase_rows:
if row.get("catalog_id"):
catalog_details.setdefault(
row["catalog_id"],
{
"product_type": row.get("product_type", ""),
"category": row.get("category", ""),
},
)
for row in suggestions:
catalog_id = row["catalog_id"]
details = catalog_details.get(catalog_id, {})
display_rows.append(
{
**row,
"product_type": details.get("product_type", ""),
"category": details.get("category", ""),
"linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0),
"linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())),
}
)
return display_rows
def print_catalog_rows(rows):
for index, row in enumerate(rows, start=1):
click.echo(
f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, "
f"{row.get('category', '')} ({row['linked_normalized_items']} items, "
f"{row['linked_purchase_rows']} rows)"
)
def build_display_lines(related_rows):
lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1):
lines.append(
" [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | "
"{upc} | {retailer}".format(
" [{index}] {raw_item_name} | {retailer} | {purchase_date} | {line_total} | {image_url}".format(
index=index,
raw_item_name=row.get("raw_item_name", ""),
retailer=row.get("retailer", ""),
purchase_date=row.get("purchase_date", ""),
line_total=row.get("line_total", ""),
raw_item_name=row.get("raw_item_name", ""),
normalized_item_name=row.get("normalized_item_name", ""),
upc=row.get("upc", ""),
retailer=row.get("retailer", ""),
image_url=row.get("image_url", ""),
)
)
if row.get("image_url"):
lines.append(f" {row['image_url']}")
if not lines:
lines.append(" [1] no matched item rows found")
return lines
@@ -215,8 +334,7 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count):
f"Select the catalog_name to associate {matched_count} items with:",
fg=INFO_COLOR,
)
for index, row in enumerate(display_rows, start=1):
click.echo(f" [{index}] {row['catalog_name']} | {row['catalog_id']}")
print_catalog_rows(display_rows)
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)),
@@ -241,13 +359,16 @@ def choose_existing_catalog(display_rows, normalized_name, matched_count):
def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total):
suggestions = build_catalog_suggestions(related_rows, purchase_rows, catalog_rows)
suggestions = suggestion_display_rows(
build_catalog_suggestions(related_rows, purchase_rows, catalog_rows),
purchase_rows,
catalog_rows,
)
normalized_name = normalized_label(queue_row, related_rows)
matched_count = len(related_rows)
click.echo("")
click.secho(
f"Review {queue_index}/{queue_total}: Resolve normalized_item {normalized_name} "
"to catalog_name [__]?",
f"Review {queue_index}/{queue_total}: {normalized_name}",
fg=INFO_COLOR,
)
click.echo(f"{matched_count} matched items:")
@@ -255,12 +376,30 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
click.echo(line)
if suggestions:
click.echo(f"{len(suggestions)} catalog_name suggestions found:")
for index, suggestion in enumerate(suggestions, start=1):
click.echo(f" [{index}] {suggestion['catalog_name']}")
print_catalog_rows(suggestions)
else:
click.echo("no catalog_name suggestions found")
click.secho("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", fg=PROMPT_COLOR)
action = click.prompt("", type=click.Choice(["l", "n", "x", "s", "q"]), prompt_suffix=" ")
prompt_bits = []
if suggestions:
prompt_bits.append("[#] link to suggestion")
prompt_bits.extend(["[f]ind", "[n]ew", "[s]kip", "e[x]clude", "[q]uit"])
click.secho(" ".join(prompt_bits) + " >", fg=PROMPT_COLOR)
action = click.prompt("", type=str, prompt_suffix=" ").strip().lower()
if action.isdigit() and suggestions:
choice = int(action)
if 1 <= choice <= len(suggestions):
chosen_row = suggestions[choice - 1]
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": chosen_row["catalog_id"],
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
click.secho("invalid suggestion number", fg=WARNING_COLOR)
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
if action == "q":
return None, None
if action == "s":
@@ -272,6 +411,43 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if action == "f":
while True:
query = click.prompt(click.style("search", fg=PROMPT_COLOR), default="", show_default=False).strip()
if not query:
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
search_rows = search_catalog_rows(
query,
catalog_rows,
purchase_rows,
queue_row["normalized_item_id"],
)
if not search_rows:
click.echo("no matches found")
retry = click.prompt(
click.style("search again? [enter=yes, q=no]", fg=PROMPT_COLOR),
default="",
show_default=False,
).strip().lower()
if retry == "q":
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
continue
click.echo(f"{len(search_rows)} search results found:")
print_catalog_rows(search_rows)
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(search_rows)),
)
chosen_row = search_rows[choice - 1]
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": chosen_row["catalog_id"],
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "x":
notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
@@ -282,45 +458,13 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "l":
display_rows = suggestions or [
{
"catalog_id": row["catalog_id"],
"catalog_name": row["catalog_name"],
"reason": "catalog sample",
}
for row in catalog_rows[:10]
if row.get("catalog_id")
]
while True:
catalog_id, outcome = choose_existing_catalog(display_rows, normalized_name, matched_count)
if outcome == "skip":
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if outcome == "quit":
return None, None
if outcome == "back":
continue
break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"normalized_item_id": queue_row["normalized_item_id"],
"catalog_id": catalog_id,
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action != "n":
click.secho("invalid action", fg=WARNING_COLOR)
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
catalog_name = click.prompt(click.style("catalog name", fg=PROMPT_COLOR), type=str)
category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False)
product_type = click.prompt(click.style("product type", fg=PROMPT_COLOR), default="", show_default=False)
category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False)
notes = click.prompt(click.style("notes", fg=PROMPT_COLOR), default="", show_default=False)
catalog_id = stable_id("cat", f"manual|{catalog_name}|{category}|{product_type}")
catalog_row = {
@@ -349,17 +493,41 @@ def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queu
return resolution_row, catalog_row
def apply_resolution_to_queue(queue_rows, resolution_lookup):
today_text = str(date.today())
updated_rows = []
for row in queue_rows:
resolution = resolution_lookup.get(row["normalized_item_id"], {})
row_copy = dict(row)
if resolution:
row_copy["catalog_id"] = resolution.get("catalog_id", "")
row_copy["status"] = resolution.get("status", row_copy.get("status", "pending"))
row_copy["resolution_action"] = resolution.get("resolution_action", "")
row_copy["resolution_notes"] = resolution.get("resolution_notes", "")
row_copy["updated_at"] = resolution.get("reviewed_at", today_text)
if resolution.get("status") == "approved":
row_copy["created_at"] = row_copy.get("created_at") or resolution.get("reviewed_at", today_text)
updated_rows.append(row_copy)
return updated_rows
def link_rows_from_state(link_lookup):
return sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"])
@click.command()
@click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--queue-csv", default="data/review/review_queue.csv", show_default=True)
@click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="data/catalog.csv", show_default=True)
@click.option("--links-csv", default="data/review/product_links.csv", show_default=True)
@click.option("--limit", default=0, show_default=True, type=int)
@click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.")
def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only):
def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, links_csv, limit, refresh_only):
purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv)
resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv)
catalog_rows = build_purchases.merge_catalog_rows(build_purchases.read_optional_csv_rows(catalog_csv), [])
link_lookup = build_purchases.load_link_lookup(build_purchases.read_optional_csv_rows(links_csv))
queue_rows = build_review_queue(purchase_rows, resolution_rows)
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}")
@@ -367,6 +535,7 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_
if refresh_only:
return
print_intro_text()
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
catalog_by_id = {row["catalog_id"]: row for row in catalog_rows if row.get("catalog_id")}
rows_by_normalized = defaultdict(list)
@@ -388,16 +557,38 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_
if catalog_row and catalog_row["catalog_id"] not in catalog_by_id:
catalog_by_id[catalog_row["catalog_id"]] = catalog_row
catalog_rows.append(catalog_row)
normalized_item_id = resolution_row["normalized_item_id"]
if resolution_row["status"] == "approved":
if resolution_row["resolution_action"] in {"link", "create"} and resolution_row.get("catalog_id"):
link_lookup[normalized_item_id] = {
"normalized_item_id": normalized_item_id,
"catalog_id": resolution_row["catalog_id"],
"link_method": f"manual_{resolution_row['resolution_action']}",
"link_confidence": "high",
"review_status": "approved",
"reviewed_by": "",
"reviewed_at": resolution_row.get("reviewed_at", ""),
"link_notes": resolution_row.get("resolution_notes", ""),
}
elif resolution_row["resolution_action"] == "exclude":
link_lookup.pop(normalized_item_id, None)
queue_rows = apply_resolution_to_queue(queue_rows, resolution_lookup)
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
save_resolution_rows(
resolutions_csv,
sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]),
)
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"]))
save_link_rows(links_csv, link_rows_from_state(link_lookup))
reviewed += 1
save_resolution_rows(
resolutions_csv,
sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]),
)
save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]))
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"]))
save_link_rows(links_csv, link_rows_from_state(link_lookup))
click.echo(
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} "
f"and {len(catalog_by_id)} catalog rows to {catalog_csv}"
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv}, "
f"{len(catalog_by_id)} catalog rows to {catalog_csv}, "
f"and {len(link_lookup)} product links to {links_csv}"
)

View File

@@ -76,12 +76,44 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual("cat_2", suggestions[0]["catalog_id"])
self.assertEqual("exact upc", suggestions[0]["reason"])
def test_search_catalog_rows_ranks_token_overlap(self):
results = review_products.search_catalog_rows(
"mixed pepper",
[
{
"catalog_id": "cat_1",
"catalog_name": "MIXED PEPPER",
"product_type": "pepper",
"category": "produce",
"variant": "",
},
{
"catalog_id": "cat_2",
"catalog_name": "GROUND PEPPER",
"product_type": "spice",
"category": "baking",
"variant": "",
},
],
[
{
"normalized_item_id": "gnorm_mix",
"catalog_id": "cat_1",
}
],
"cnorm_mix",
)
self.assertEqual("cat_1", results[0]["catalog_id"])
self.assertGreater(results[0]["score"], results[1]["score"])
def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
purchase_fields = [
"purchase_date",
@@ -176,21 +208,23 @@ class ReviewWorkflowTests(unittest.TestCase):
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Review 1/1: Resolve normalized_item MIXED PEPPER to catalog_name [__]?", result.output)
self.assertIn("Review guide:", result.output)
self.assertIn("Review 1/1: MIXED PEPPER", result.output)
self.assertIn("2 matched items:", result.output)
self.assertIn("[l]ink existing [n]ew catalog e[x]clude [s]kip [q]uit:", result.output)
first_item = result.output.index("[1] 2026-03-14 | 7.49")
second_item = result.output.index("[2] 2026-03-12 | 6.99")
self.assertIn("[#] link to suggestion [f]ind [n]ew [s]kip e[x]clude [q]uit >", result.output)
first_item = result.output.index("[1] MIXED PEPPER 6-PACK | costco | 2026-03-14 | 7.49 | ")
second_item = result.output.index("[2] MIXED PEPPER 6-PACK | costco | 2026-03-12 | 6.99 | https://example.test/mixed-pepper.jpg")
self.assertLess(first_item, second_item)
self.assertIn("https://example.test/mixed-pepper.jpg", result.output)
self.assertIn("1 catalog_name suggestions found:", result.output)
self.assertIn("[1] MIXED PEPPER", result.output)
self.assertIn("[1] MIXED PEPPER, pepper, produce (1 items, 1 rows)", result.output)
self.assertIn("\x1b[", result.output)
def test_review_products_no_suggestions_is_informational(self):
@@ -199,6 +233,7 @@ class ReviewWorkflowTests(unittest.TestCase):
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
@@ -249,6 +284,8 @@ class ReviewWorkflowTests(unittest.TestCase):
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
],
input="q\n",
color=True,
@@ -257,12 +294,13 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual(0, result.exit_code)
self.assertIn("no catalog_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self):
def test_search_links_catalog_and_writes_link_row(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
@@ -358,22 +396,180 @@ class ReviewWorkflowTests(unittest.TestCase):
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
"--limit",
"1",
],
input="l\n1\ny\nlinked by test\n",
input="f\nmixed pepper\n1\nlinked by test\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Select the catalog_name to associate 2 items with:", result.output)
self.assertIn("[1] MIXED PEPPER | cat_mix", result.output)
self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output)
self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output)
self.assertIn("1 search results found:", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
with links_csv.open(newline="", encoding="utf-8") as handle:
link_rows = list(csv.DictReader(handle))
self.assertEqual("cat_mix", rows[0]["catalog_id"])
self.assertEqual("link", rows[0]["resolution_action"])
self.assertEqual("cat_mix", link_rows[0]["catalog_id"])
def test_search_no_matches_allows_retry_or_return(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"normalized_item_id",
"catalog_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_item_id": "gnorm_ice",
"catalog_id": "",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"catalog_id": "cat_ice",
"catalog_name": "ICE",
"category": "frozen",
"product_type": "ice",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
],
input="f\nzzz\nq\nq\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("no matches found", result.output)
def test_skip_remains_available_from_main_prompt(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"normalized_item_id",
"catalog_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_item_id": "gnorm_skip",
"catalog_id": "",
"raw_item_name": "TEST ITEM",
"normalized_item_name": "TEST ITEM",
"image_url": "",
"upc": "",
"line_total": "1.00",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--links-csv",
str(links_csv),
"--limit",
"1",
],
input="s\n",
color=True,
)
self.assertEqual(0, result.exit_code)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual("skip", rows[0]["resolution_action"])
self.assertEqual("pending", rows[0]["status"])
def test_review_products_creates_catalog_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir:
@@ -381,6 +577,7 @@ class ReviewWorkflowTests(unittest.TestCase):
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
@@ -426,6 +623,7 @@ class ReviewWorkflowTests(unittest.TestCase):
queue_csv=str(queue_csv),
resolutions_csv=str(resolutions_csv),
catalog_csv=str(catalog_csv),
links_csv=str(links_csv),
limit=1,
refresh_only=False,
)
@@ -433,13 +631,21 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertTrue(queue_csv.exists())
self.assertTrue(resolutions_csv.exists())
self.assertTrue(catalog_csv.exists())
self.assertTrue(links_csv.exists())
with queue_csv.open(newline="", encoding="utf-8") as handle:
queue_rows = list(csv.DictReader(handle))
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
resolution_rows = list(csv.DictReader(handle))
with catalog_csv.open(newline="", encoding="utf-8") as handle:
catalog_rows = list(csv.DictReader(handle))
with links_csv.open(newline="", encoding="utf-8") as handle:
link_rows = list(csv.DictReader(handle))
self.assertEqual("approved", queue_rows[0]["status"])
self.assertEqual("create", queue_rows[0]["resolution_action"])
self.assertEqual("create", resolution_rows[0]["resolution_action"])
self.assertEqual("approved", resolution_rows[0]["status"])
self.assertEqual("ICE", catalog_rows[0]["catalog_name"])
self.assertEqual(catalog_rows[0]["catalog_id"], link_rows[0]["catalog_id"])
if __name__ == "__main__":