From 967e19e5614467be295c751e4448f24521cf3a61 Mon Sep 17 00:00:00 2001 From: ben Date: Tue, 17 Mar 2026 15:07:42 -0400 Subject: [PATCH] Add pipeline status accounting --- report_pipeline_status.py | 119 ++++++++++++++++++++++++++++++++++ tests/test_pipeline_status.py | 80 +++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 report_pipeline_status.py create mode 100644 tests/test_pipeline_status.py diff --git a/report_pipeline_status.py b/report_pipeline_status.py new file mode 100644 index 0000000..594861e --- /dev/null +++ b/report_pipeline_status.py @@ -0,0 +1,119 @@ +import json +from pathlib import Path + +import click + +import build_observed_products +import build_purchases +import review_products +from layer_helpers import read_csv_rows, write_csv_rows + + +SUMMARY_FIELDS = ["stage", "count"] + + +def read_rows_if_exists(path): + path = Path(path) + if not path.exists(): + return [] + return read_csv_rows(path) + + +def build_status_summary( + giant_orders, + giant_items, + giant_enriched, + costco_orders, + costco_items, + costco_enriched, + purchases, + resolutions, +): + enriched_rows = giant_enriched + costco_enriched + observed_rows = build_observed_products.build_observed_products(enriched_rows) + queue_rows = review_products.build_review_queue(purchases, resolutions) + + unresolved_purchase_rows = [ + row + for row in purchases + if row.get("observed_product_id") + and not row.get("canonical_product_id") + and row.get("is_fee") != "true" + and row.get("is_discount_line") != "true" + and row.get("is_coupon_line") != "true" + ] + excluded_rows = [ + row + for row in purchases + if row.get("resolution_action") == "exclude" + ] + linked_purchase_rows = [row for row in purchases if row.get("canonical_product_id")] + + summary = [ + {"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)}, + {"stage": "raw_items", "count": len(giant_items) + len(costco_items)}, + {"stage": "enriched_items", "count": len(enriched_rows)}, + {"stage": "observed_products", "count": len(observed_rows)}, + {"stage": "review_queue_observed_products", "count": len(queue_rows)}, + {"stage": "canonical_linked_purchase_rows", "count": len(linked_purchase_rows)}, + {"stage": "final_purchase_rows", "count": len(purchases)}, + {"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)}, + {"stage": "excluded_purchase_rows", "count": len(excluded_rows)}, + { + "stage": "unresolved_not_in_review_rows", + "count": len( + [ + row + for row in unresolved_purchase_rows + if row.get("observed_product_id") + not in {queue_row["observed_product_id"] for queue_row in queue_rows} + ] + ), + }, + ] + return summary + + +@click.command() +@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True) +@click.option("--giant-items-csv", default="giant_output/items.csv", show_default=True) +@click.option("--giant-enriched-csv", default="giant_output/items_enriched.csv", show_default=True) +@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True) +@click.option("--costco-items-csv", default="costco_output/items.csv", show_default=True) +@click.option("--costco-enriched-csv", default="costco_output/items_enriched.csv", show_default=True) +@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True) +@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) +@click.option("--summary-csv", default="combined_output/pipeline_status.csv", show_default=True) +@click.option("--summary-json", default="combined_output/pipeline_status.json", show_default=True) +def main( + giant_orders_csv, + giant_items_csv, + giant_enriched_csv, + costco_orders_csv, + costco_items_csv, + costco_enriched_csv, + purchases_csv, + resolutions_csv, + summary_csv, + summary_json, +): + summary_rows = build_status_summary( + read_rows_if_exists(giant_orders_csv), + read_rows_if_exists(giant_items_csv), + read_rows_if_exists(giant_enriched_csv), + read_rows_if_exists(costco_orders_csv), + read_rows_if_exists(costco_items_csv), + read_rows_if_exists(costco_enriched_csv), + read_rows_if_exists(purchases_csv), + read_rows_if_exists(resolutions_csv), + ) + write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS) + summary_json_path = Path(summary_json) + summary_json_path.parent.mkdir(parents=True, exist_ok=True) + summary_json_path.write_text(json.dumps(summary_rows, indent=2), encoding="utf-8") + for row in summary_rows: + click.echo(f"{row['stage']}: {row['count']}") + + +if __name__ == "__main__": + main() diff --git a/tests/test_pipeline_status.py b/tests/test_pipeline_status.py new file mode 100644 index 0000000..573608d --- /dev/null +++ b/tests/test_pipeline_status.py @@ -0,0 +1,80 @@ +import unittest + +import report_pipeline_status + + +class PipelineStatusTests(unittest.TestCase): + def test_build_status_summary_reports_unresolved_and_reviewed_counts(self): + summary = report_pipeline_status.build_status_summary( + giant_orders=[{"order_id": "g1"}], + giant_items=[{"order_id": "g1", "line_no": "1"}], + giant_enriched=[ + { + "retailer": "giant", + "order_id": "g1", + "line_no": "1", + "item_name_norm": "BANANA", + "item_name": "FRESH BANANA", + "retailer_item_id": "1", + "upc": "4011", + "brand_guess": "", + "variant": "", + "size_value": "", + "size_unit": "", + "pack_qty": "", + "measure_type": "weight", + "image_url": "", + "is_store_brand": "false", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + "order_date": "2026-03-01", + "line_total": "1.29", + } + ], + costco_orders=[], + costco_items=[], + costco_enriched=[], + purchases=[ + { + "observed_product_id": "gobs_banana", + "canonical_product_id": "gcan_banana", + "resolution_action": "", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + "retailer": "giant", + "raw_item_name": "FRESH BANANA", + "normalized_item_name": "BANANA", + "upc": "4011", + "line_total": "1.29", + }, + { + "observed_product_id": "gobs_lime", + "canonical_product_id": "", + "resolution_action": "", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + "retailer": "costco", + "raw_item_name": "LIME 5LB", + "normalized_item_name": "LIME", + "upc": "", + "line_total": "4.99", + }, + ], + resolutions=[], + ) + + counts = {row["stage"]: row["count"] for row in summary} + self.assertEqual(1, counts["raw_orders"]) + self.assertEqual(1, counts["raw_items"]) + self.assertEqual(1, counts["enriched_items"]) + self.assertEqual(1, counts["canonical_linked_purchase_rows"]) + self.assertEqual(1, counts["unresolved_purchase_rows"]) + self.assertEqual(1, counts["review_queue_observed_products"]) + self.assertEqual(0, counts["unresolved_not_in_review_rows"]) + + +if __name__ == "__main__": + unittest.main()