diff --git a/build_review_queue.py b/build_review_queue.py new file mode 100644 index 0000000..2ff6f35 --- /dev/null +++ b/build_review_queue.py @@ -0,0 +1,168 @@ +from collections import defaultdict +from datetime import date + +import click + +from layer_helpers import compact_join, distinct_values, read_csv_rows, stable_id, write_csv_rows + + +OUTPUT_FIELDS = [ + "review_id", + "queue_type", + "retailer", + "observed_product_id", + "canonical_product_id", + "reason_code", + "priority", + "raw_item_names", + "normalized_names", + "upc", + "image_url", + "example_prices", + "seen_count", + "status", + "resolution_notes", + "created_at", + "updated_at", +] + + +def existing_review_state(path): + try: + rows = read_csv_rows(path) + except FileNotFoundError: + return {} + return {row["review_id"]: row for row in rows} + + +def review_reasons(observed_row): + reasons = [] + if observed_row["is_fee"] == "true": + return reasons + if observed_row["distinct_upcs_count"] not in {"", "0", "1"}: + reasons.append(("multiple_upcs", "high")) + if observed_row["distinct_item_names_count"] not in {"", "0", "1"}: + reasons.append(("multiple_raw_names", "medium")) + if not observed_row["representative_image_url"]: + reasons.append(("missing_image", "medium")) + if not observed_row["representative_upc"]: + reasons.append(("missing_upc", "high")) + if not observed_row["representative_name_norm"]: + reasons.append(("missing_normalized_name", "high")) + return reasons + + +def build_review_queue(observed_rows, item_rows, existing_rows, today_text): + by_observed = defaultdict(list) + for row in item_rows: + observed_id = row.get("observed_product_id", "") + if observed_id: + by_observed[observed_id].append(row) + + queue_rows = [] + for observed_row in observed_rows: + reasons = review_reasons(observed_row) + if not reasons: + continue + + related_items = by_observed.get(observed_row["observed_product_id"], []) + raw_names = compact_join(distinct_values(related_items, "item_name"), limit=5) + norm_names = compact_join( + distinct_values(related_items, "item_name_norm"), limit=5 + ) + example_prices = compact_join( + distinct_values(related_items, "line_total"), limit=5 + ) + + for reason_code, priority in reasons: + review_id = stable_id( + "rvw", + f"{observed_row['observed_product_id']}|{reason_code}", + ) + prior = existing_rows.get(review_id, {}) + queue_rows.append( + { + "review_id": review_id, + "queue_type": "observed_product", + "retailer": observed_row["retailer"], + "observed_product_id": observed_row["observed_product_id"], + "canonical_product_id": prior.get("canonical_product_id", ""), + "reason_code": reason_code, + "priority": priority, + "raw_item_names": raw_names, + "normalized_names": norm_names, + "upc": observed_row["representative_upc"], + "image_url": observed_row["representative_image_url"], + "example_prices": example_prices, + "seen_count": observed_row["times_seen"], + "status": prior.get("status", "pending"), + "resolution_notes": prior.get("resolution_notes", ""), + "created_at": prior.get("created_at", today_text), + "updated_at": today_text, + } + ) + + queue_rows.sort(key=lambda row: (row["priority"], row["reason_code"], row["review_id"])) + return queue_rows + + +def attach_observed_ids(item_rows, observed_rows): + observed_by_key = {row["observed_key"]: row["observed_product_id"] for row in observed_rows} + attached = [] + for row in item_rows: + observed_key = "|".join( + [ + row["retailer"], + f"upc={row['upc']}", + f"name={row['item_name_norm']}", + ] + ) if row.get("upc") else "|".join( + [ + row["retailer"], + f"name={row['item_name_norm']}", + f"size={row['size_value']}", + f"unit={row['size_unit']}", + f"pack={row['pack_qty']}", + f"measure={row['measure_type']}", + f"store_brand={row['is_store_brand']}", + f"fee={row['is_fee']}", + ] + ) + enriched = dict(row) + enriched["observed_product_id"] = observed_by_key.get(observed_key, "") + attached.append(enriched) + return attached + + +@click.command() +@click.option( + "--observed-csv", + default="giant_output/products_observed.csv", + show_default=True, + help="Path to observed product rows.", +) +@click.option( + "--items-enriched-csv", + default="giant_output/items_enriched.csv", + show_default=True, + help="Path to enriched Giant item rows.", +) +@click.option( + "--output-csv", + default="giant_output/review_queue.csv", + show_default=True, + help="Path to review queue output.", +) +def main(observed_csv, items_enriched_csv, output_csv): + observed_rows = read_csv_rows(observed_csv) + item_rows = read_csv_rows(items_enriched_csv) + item_rows = attach_observed_ids(item_rows, observed_rows) + existing_rows = existing_review_state(output_csv) + today_text = str(date.today()) + queue_rows = build_review_queue(observed_rows, item_rows, existing_rows, today_text) + write_csv_rows(output_csv, queue_rows, OUTPUT_FIELDS) + click.echo(f"wrote {len(queue_rows)} rows to {output_csv}") + + +if __name__ == "__main__": + main() diff --git a/tests/test_review_queue.py b/tests/test_review_queue.py new file mode 100644 index 0000000..c644227 --- /dev/null +++ b/tests/test_review_queue.py @@ -0,0 +1,124 @@ +import tempfile +import unittest +from pathlib import Path + +import build_observed_products +import build_review_queue +from layer_helpers import write_csv_rows + + +class ReviewQueueTests(unittest.TestCase): + def test_build_review_queue_preserves_existing_status(self): + observed_rows = [ + { + "observed_product_id": "gobs_1", + "retailer": "giant", + "representative_upc": "111", + "representative_image_url": "", + "representative_name_norm": "GALA APPLE", + "times_seen": "2", + "distinct_item_names_count": "2", + "distinct_upcs_count": "1", + "is_fee": "false", + } + ] + item_rows = [ + { + "observed_product_id": "gobs_1", + "item_name": "SB GALA APPLE 5LB", + "item_name_norm": "GALA APPLE", + "line_total": "7.99", + }, + { + "observed_product_id": "gobs_1", + "item_name": "SB GALA APPLE 5 LB", + "item_name_norm": "GALA APPLE", + "line_total": "8.49", + }, + ] + existing = { + build_review_queue.stable_id("rvw", "gobs_1|missing_image"): { + "status": "approved", + "resolution_notes": "looked fine", + "created_at": "2026-03-15", + } + } + + queue = build_review_queue.build_review_queue( + observed_rows, item_rows, existing, "2026-03-16" + ) + + self.assertEqual(2, len(queue)) + missing_image = [row for row in queue if row["reason_code"] == "missing_image"][0] + self.assertEqual("approved", missing_image["status"]) + self.assertEqual("looked fine", missing_image["resolution_notes"]) + + def test_review_queue_main_writes_output(self): + with tempfile.TemporaryDirectory() as tmpdir: + observed_path = Path(tmpdir) / "products_observed.csv" + items_path = Path(tmpdir) / "items_enriched.csv" + output_path = Path(tmpdir) / "review_queue.csv" + + observed_rows = [ + { + "observed_product_id": "gobs_1", + "retailer": "giant", + "observed_key": "giant|upc=111|name=GALA APPLE", + "representative_upc": "111", + "representative_item_name": "SB GALA APPLE 5LB", + "representative_name_norm": "GALA APPLE", + "representative_brand": "SB", + "representative_variant": "", + "representative_size_value": "5", + "representative_size_unit": "lb", + "representative_pack_qty": "", + "representative_measure_type": "weight", + "representative_image_url": "", + "is_store_brand": "true", + "is_fee": "false", + "first_seen_date": "2026-01-01", + "last_seen_date": "2026-01-10", + "times_seen": "2", + "example_order_id": "1", + "example_item_name": "SB GALA APPLE 5LB", + "raw_name_examples": "SB GALA APPLE 5LB | SB GALA APPLE 5 LB", + "normalized_name_examples": "GALA APPLE", + "example_prices": "7.99 | 8.49", + "distinct_item_names_count": "2", + "distinct_upcs_count": "1", + } + ] + item_rows = [ + { + "retailer": "giant", + "order_id": "1", + "line_no": "1", + "item_name": "SB GALA APPLE 5LB", + "item_name_norm": "GALA APPLE", + "upc": "111", + "size_value": "5", + "size_unit": "lb", + "pack_qty": "", + "measure_type": "weight", + "is_store_brand": "true", + "is_fee": "false", + "line_total": "7.99", + } + ] + + write_csv_rows( + observed_path, observed_rows, build_observed_products.OUTPUT_FIELDS + ) + write_csv_rows(items_path, item_rows, list(item_rows[0].keys())) + + build_review_queue.main.callback( + observed_csv=str(observed_path), + items_enriched_csv=str(items_path), + output_csv=str(output_path), + ) + + self.assertTrue(output_path.exists()) + + +if __name__ == "__main__": + unittest.main()