import json from pathlib import Path import click import build_purchases import review_products from layer_helpers import read_csv_rows, write_csv_rows SUMMARY_FIELDS = ["stage", "count"] def read_rows_if_exists(path): path = Path(path) if not path.exists(): return [] return read_csv_rows(path) def build_status_summary( giant_orders, giant_items, giant_enriched, costco_orders, costco_items, costco_enriched, purchases, resolutions, links, catalog, ): normalized_rows = giant_enriched + costco_enriched queue_rows = review_products.build_review_queue(purchases, resolutions, links, catalog, []) queue_ids = {row["normalized_item_id"] for row in queue_rows} unresolved_purchase_rows = [ row for row in purchases if row.get("normalized_item_id") and not row.get("catalog_id") and row.get("resolution_action") != "exclude" and row.get("is_fee") != "true" and row.get("is_discount_line") != "true" and row.get("is_coupon_line") != "true" ] excluded_rows = [row for row in purchases if row.get("resolution_action") == "exclude"] linked_purchase_rows = [row for row in purchases if row.get("catalog_id")] distinct_normalized_items = { row["normalized_item_id"] for row in normalized_rows if row.get("normalized_item_id") } linked_normalized_items = { row["normalized_item_id"] for row in purchases if row.get("normalized_item_id") and row.get("catalog_id") } summary = [ {"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)}, {"stage": "raw_items", "count": len(giant_items) + len(costco_items)}, {"stage": "normalized_items", "count": len(normalized_rows)}, {"stage": "distinct_normalized_items", "count": len(distinct_normalized_items)}, {"stage": "review_queue_normalized_items", "count": len(queue_rows)}, {"stage": "linked_normalized_items", "count": len(linked_normalized_items)}, {"stage": "linked_purchase_rows", "count": len(linked_purchase_rows)}, {"stage": "final_purchase_rows", "count": len(purchases)}, {"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)}, {"stage": "excluded_purchase_rows", "count": len(excluded_rows)}, { "stage": "unresolved_not_in_review_rows", "count": len( [ row for row in unresolved_purchase_rows if row.get("normalized_item_id") not in queue_ids ] ), }, ] return summary @click.command() @click.option("--giant-orders-csv", default="data/giant-web/collected_orders.csv", show_default=True) @click.option("--giant-items-csv", default="data/giant-web/collected_items.csv", show_default=True) @click.option("--giant-enriched-csv", default="data/giant-web/normalized_items.csv", show_default=True) @click.option("--costco-orders-csv", default="data/costco-web/collected_orders.csv", show_default=True) @click.option("--costco-items-csv", default="data/costco-web/collected_items.csv", show_default=True) @click.option("--costco-enriched-csv", default="data/costco-web/normalized_items.csv", show_default=True) @click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True) @click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True) @click.option("--links-csv", default="data/review/product_links.csv", show_default=True) @click.option("--catalog-csv", default="data/catalog.csv", show_default=True) @click.option("--summary-csv", default="data/review/pipeline_status.csv", show_default=True) @click.option("--summary-json", default="data/review/pipeline_status.json", show_default=True) def main( giant_orders_csv, giant_items_csv, giant_enriched_csv, costco_orders_csv, costco_items_csv, costco_enriched_csv, purchases_csv, resolutions_csv, links_csv, catalog_csv, summary_csv, summary_json, ): summary_rows = build_status_summary( read_rows_if_exists(giant_orders_csv), read_rows_if_exists(giant_items_csv), read_rows_if_exists(giant_enriched_csv), read_rows_if_exists(costco_orders_csv), read_rows_if_exists(costco_items_csv), read_rows_if_exists(costco_enriched_csv), read_rows_if_exists(purchases_csv), [build_purchases.normalize_resolution_row(row) for row in read_rows_if_exists(resolutions_csv)], [build_purchases.normalize_link_row(row) for row in read_rows_if_exists(links_csv)], [build_purchases.normalize_catalog_row(row) for row in read_rows_if_exists(catalog_csv)], ) write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS) summary_json_path = Path(summary_json) summary_json_path.parent.mkdir(parents=True, exist_ok=True) summary_json_path.write_text(json.dumps(summary_rows, indent=2), encoding="utf-8") for row in summary_rows: click.echo(f"{row['stage']}: {row['count']}") if __name__ == "__main__": main()