Add pipeline status accounting
This commit is contained in:
119
report_pipeline_status.py
Normal file
119
report_pipeline_status.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
import build_observed_products
|
||||||
|
import build_purchases
|
||||||
|
import review_products
|
||||||
|
from layer_helpers import read_csv_rows, write_csv_rows
|
||||||
|
|
||||||
|
|
||||||
|
SUMMARY_FIELDS = ["stage", "count"]
|
||||||
|
|
||||||
|
|
||||||
|
def read_rows_if_exists(path):
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
return read_csv_rows(path)
|
||||||
|
|
||||||
|
|
||||||
|
def build_status_summary(
|
||||||
|
giant_orders,
|
||||||
|
giant_items,
|
||||||
|
giant_enriched,
|
||||||
|
costco_orders,
|
||||||
|
costco_items,
|
||||||
|
costco_enriched,
|
||||||
|
purchases,
|
||||||
|
resolutions,
|
||||||
|
):
|
||||||
|
enriched_rows = giant_enriched + costco_enriched
|
||||||
|
observed_rows = build_observed_products.build_observed_products(enriched_rows)
|
||||||
|
queue_rows = review_products.build_review_queue(purchases, resolutions)
|
||||||
|
|
||||||
|
unresolved_purchase_rows = [
|
||||||
|
row
|
||||||
|
for row in purchases
|
||||||
|
if row.get("observed_product_id")
|
||||||
|
and not row.get("canonical_product_id")
|
||||||
|
and row.get("is_fee") != "true"
|
||||||
|
and row.get("is_discount_line") != "true"
|
||||||
|
and row.get("is_coupon_line") != "true"
|
||||||
|
]
|
||||||
|
excluded_rows = [
|
||||||
|
row
|
||||||
|
for row in purchases
|
||||||
|
if row.get("resolution_action") == "exclude"
|
||||||
|
]
|
||||||
|
linked_purchase_rows = [row for row in purchases if row.get("canonical_product_id")]
|
||||||
|
|
||||||
|
summary = [
|
||||||
|
{"stage": "raw_orders", "count": len(giant_orders) + len(costco_orders)},
|
||||||
|
{"stage": "raw_items", "count": len(giant_items) + len(costco_items)},
|
||||||
|
{"stage": "enriched_items", "count": len(enriched_rows)},
|
||||||
|
{"stage": "observed_products", "count": len(observed_rows)},
|
||||||
|
{"stage": "review_queue_observed_products", "count": len(queue_rows)},
|
||||||
|
{"stage": "canonical_linked_purchase_rows", "count": len(linked_purchase_rows)},
|
||||||
|
{"stage": "final_purchase_rows", "count": len(purchases)},
|
||||||
|
{"stage": "unresolved_purchase_rows", "count": len(unresolved_purchase_rows)},
|
||||||
|
{"stage": "excluded_purchase_rows", "count": len(excluded_rows)},
|
||||||
|
{
|
||||||
|
"stage": "unresolved_not_in_review_rows",
|
||||||
|
"count": len(
|
||||||
|
[
|
||||||
|
row
|
||||||
|
for row in unresolved_purchase_rows
|
||||||
|
if row.get("observed_product_id")
|
||||||
|
not in {queue_row["observed_product_id"] for queue_row in queue_rows}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
return summary
|
||||||
|
|
||||||
|
|
||||||
|
@click.command()
|
||||||
|
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
|
||||||
|
@click.option("--giant-items-csv", default="giant_output/items.csv", show_default=True)
|
||||||
|
@click.option("--giant-enriched-csv", default="giant_output/items_enriched.csv", show_default=True)
|
||||||
|
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
|
||||||
|
@click.option("--costco-items-csv", default="costco_output/items.csv", show_default=True)
|
||||||
|
@click.option("--costco-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
|
||||||
|
@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True)
|
||||||
|
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
|
||||||
|
@click.option("--summary-csv", default="combined_output/pipeline_status.csv", show_default=True)
|
||||||
|
@click.option("--summary-json", default="combined_output/pipeline_status.json", show_default=True)
|
||||||
|
def main(
|
||||||
|
giant_orders_csv,
|
||||||
|
giant_items_csv,
|
||||||
|
giant_enriched_csv,
|
||||||
|
costco_orders_csv,
|
||||||
|
costco_items_csv,
|
||||||
|
costco_enriched_csv,
|
||||||
|
purchases_csv,
|
||||||
|
resolutions_csv,
|
||||||
|
summary_csv,
|
||||||
|
summary_json,
|
||||||
|
):
|
||||||
|
summary_rows = build_status_summary(
|
||||||
|
read_rows_if_exists(giant_orders_csv),
|
||||||
|
read_rows_if_exists(giant_items_csv),
|
||||||
|
read_rows_if_exists(giant_enriched_csv),
|
||||||
|
read_rows_if_exists(costco_orders_csv),
|
||||||
|
read_rows_if_exists(costco_items_csv),
|
||||||
|
read_rows_if_exists(costco_enriched_csv),
|
||||||
|
read_rows_if_exists(purchases_csv),
|
||||||
|
read_rows_if_exists(resolutions_csv),
|
||||||
|
)
|
||||||
|
write_csv_rows(summary_csv, summary_rows, SUMMARY_FIELDS)
|
||||||
|
summary_json_path = Path(summary_json)
|
||||||
|
summary_json_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
summary_json_path.write_text(json.dumps(summary_rows, indent=2), encoding="utf-8")
|
||||||
|
for row in summary_rows:
|
||||||
|
click.echo(f"{row['stage']}: {row['count']}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
80
tests/test_pipeline_status.py
Normal file
80
tests/test_pipeline_status.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
import report_pipeline_status
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineStatusTests(unittest.TestCase):
|
||||||
|
def test_build_status_summary_reports_unresolved_and_reviewed_counts(self):
|
||||||
|
summary = report_pipeline_status.build_status_summary(
|
||||||
|
giant_orders=[{"order_id": "g1"}],
|
||||||
|
giant_items=[{"order_id": "g1", "line_no": "1"}],
|
||||||
|
giant_enriched=[
|
||||||
|
{
|
||||||
|
"retailer": "giant",
|
||||||
|
"order_id": "g1",
|
||||||
|
"line_no": "1",
|
||||||
|
"item_name_norm": "BANANA",
|
||||||
|
"item_name": "FRESH BANANA",
|
||||||
|
"retailer_item_id": "1",
|
||||||
|
"upc": "4011",
|
||||||
|
"brand_guess": "",
|
||||||
|
"variant": "",
|
||||||
|
"size_value": "",
|
||||||
|
"size_unit": "",
|
||||||
|
"pack_qty": "",
|
||||||
|
"measure_type": "weight",
|
||||||
|
"image_url": "",
|
||||||
|
"is_store_brand": "false",
|
||||||
|
"is_fee": "false",
|
||||||
|
"is_discount_line": "false",
|
||||||
|
"is_coupon_line": "false",
|
||||||
|
"order_date": "2026-03-01",
|
||||||
|
"line_total": "1.29",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
costco_orders=[],
|
||||||
|
costco_items=[],
|
||||||
|
costco_enriched=[],
|
||||||
|
purchases=[
|
||||||
|
{
|
||||||
|
"observed_product_id": "gobs_banana",
|
||||||
|
"canonical_product_id": "gcan_banana",
|
||||||
|
"resolution_action": "",
|
||||||
|
"is_fee": "false",
|
||||||
|
"is_discount_line": "false",
|
||||||
|
"is_coupon_line": "false",
|
||||||
|
"retailer": "giant",
|
||||||
|
"raw_item_name": "FRESH BANANA",
|
||||||
|
"normalized_item_name": "BANANA",
|
||||||
|
"upc": "4011",
|
||||||
|
"line_total": "1.29",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"observed_product_id": "gobs_lime",
|
||||||
|
"canonical_product_id": "",
|
||||||
|
"resolution_action": "",
|
||||||
|
"is_fee": "false",
|
||||||
|
"is_discount_line": "false",
|
||||||
|
"is_coupon_line": "false",
|
||||||
|
"retailer": "costco",
|
||||||
|
"raw_item_name": "LIME 5LB",
|
||||||
|
"normalized_item_name": "LIME",
|
||||||
|
"upc": "",
|
||||||
|
"line_total": "4.99",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
resolutions=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
counts = {row["stage"]: row["count"] for row in summary}
|
||||||
|
self.assertEqual(1, counts["raw_orders"])
|
||||||
|
self.assertEqual(1, counts["raw_items"])
|
||||||
|
self.assertEqual(1, counts["enriched_items"])
|
||||||
|
self.assertEqual(1, counts["canonical_linked_purchase_rows"])
|
||||||
|
self.assertEqual(1, counts["unresolved_purchase_rows"])
|
||||||
|
self.assertEqual(1, counts["review_queue_observed_products"])
|
||||||
|
self.assertEqual(0, counts["unresolved_not_in_review_rows"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user