from collections import defaultdict from datetime import date import click from layer_helpers import compact_join, distinct_values, read_csv_rows, stable_id, write_csv_rows OUTPUT_FIELDS = [ "review_id", "queue_type", "retailer", "observed_product_id", "canonical_product_id", "reason_code", "priority", "raw_item_names", "normalized_names", "upc", "image_url", "example_prices", "seen_count", "status", "resolution_notes", "created_at", "updated_at", ] def existing_review_state(path): try: rows = read_csv_rows(path) except FileNotFoundError: return {} return {row["review_id"]: row for row in rows} def review_reasons(observed_row): reasons = [] if ( observed_row["is_fee"] == "true" or observed_row.get("is_discount_line") == "true" or observed_row.get("is_coupon_line") == "true" ): return reasons if observed_row["distinct_upcs_count"] not in {"", "0", "1"}: reasons.append(("multiple_upcs", "high")) if observed_row["distinct_item_names_count"] not in {"", "0", "1"}: reasons.append(("multiple_raw_names", "medium")) if not observed_row["representative_image_url"]: reasons.append(("missing_image", "medium")) if not observed_row["representative_upc"]: reasons.append(("missing_upc", "high")) if not observed_row["representative_name_norm"]: reasons.append(("missing_normalized_name", "high")) return reasons def build_review_queue(observed_rows, item_rows, existing_rows, today_text): by_observed = defaultdict(list) for row in item_rows: observed_id = row.get("observed_product_id", "") if observed_id: by_observed[observed_id].append(row) queue_rows = [] for observed_row in observed_rows: reasons = review_reasons(observed_row) if not reasons: continue related_items = by_observed.get(observed_row["observed_product_id"], []) raw_names = compact_join(distinct_values(related_items, "item_name"), limit=5) norm_names = compact_join( distinct_values(related_items, "item_name_norm"), limit=5 ) example_prices = compact_join( distinct_values(related_items, "line_total"), limit=5 ) for reason_code, priority in reasons: review_id = stable_id( "rvw", f"{observed_row['observed_product_id']}|{reason_code}", ) prior = existing_rows.get(review_id, {}) queue_rows.append( { "review_id": review_id, "queue_type": "observed_product", "retailer": observed_row["retailer"], "observed_product_id": observed_row["observed_product_id"], "canonical_product_id": prior.get("canonical_product_id", ""), "reason_code": reason_code, "priority": priority, "raw_item_names": raw_names, "normalized_names": norm_names, "upc": observed_row["representative_upc"], "image_url": observed_row["representative_image_url"], "example_prices": example_prices, "seen_count": observed_row["times_seen"], "status": prior.get("status", "pending"), "resolution_notes": prior.get("resolution_notes", ""), "created_at": prior.get("created_at", today_text), "updated_at": today_text, } ) queue_rows.sort(key=lambda row: (row["priority"], row["reason_code"], row["review_id"])) return queue_rows def attach_observed_ids(item_rows, observed_rows): observed_by_key = {row["observed_key"]: row["observed_product_id"] for row in observed_rows} attached = [] for row in item_rows: observed_key = "|".join( [ row["retailer"], f"upc={row['upc']}", f"name={row['item_name_norm']}", ] ) if row.get("upc") else "|".join( [ row["retailer"], f"retailer_item_id={row.get('retailer_item_id', '')}", f"name={row['item_name_norm']}", f"size={row['size_value']}", f"unit={row['size_unit']}", f"pack={row['pack_qty']}", f"measure={row['measure_type']}", f"store_brand={row['is_store_brand']}", f"fee={row['is_fee']}", f"discount={row.get('is_discount_line', 'false')}", f"coupon={row.get('is_coupon_line', 'false')}", ] ) enriched = dict(row) enriched["observed_product_id"] = observed_by_key.get(observed_key, "") attached.append(enriched) return attached @click.command() @click.option( "--observed-csv", default="giant_output/products_observed.csv", show_default=True, help="Path to observed product rows.", ) @click.option( "--items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True, help="Path to enriched Giant item rows.", ) @click.option( "--output-csv", default="giant_output/review_queue.csv", show_default=True, help="Path to review queue output.", ) def main(observed_csv, items_enriched_csv, output_csv): observed_rows = read_csv_rows(observed_csv) item_rows = read_csv_rows(items_enriched_csv) item_rows = attach_observed_ids(item_rows, observed_rows) existing_rows = existing_review_state(output_csv) today_text = str(date.today()) queue_rows = build_review_queue(observed_rows, item_rows, existing_rows, today_text) write_csv_rows(output_csv, queue_rows, OUTPUT_FIELDS) click.echo(f"wrote {len(queue_rows)} rows to {output_csv}") if __name__ == "__main__": main()