from collections import defaultdict from pathlib import Path import click from enrich_giant import format_decimal, to_decimal from layer_helpers import read_csv_rows, write_csv_rows ITEM_PRICE_FIELDS = [ "purchase_date", "retailer", "store_name", "store_number", "store_city", "store_state", "order_id", "catalog_id", "catalog_name", "category", "product_type", "effective_price", "effective_price_unit", "net_line_total", "normalized_quantity", ] SPEND_BY_VISIT_FIELDS = [ "purchase_date", "retailer", "order_id", "store_name", "store_number", "store_city", "store_state", "visit_spend_total", ] ITEMS_PER_VISIT_FIELDS = [ "purchase_date", "retailer", "order_id", "store_name", "store_number", "store_city", "store_state", "item_row_count", "distinct_catalog_count", ] CATEGORY_SPEND_FIELDS = [ "purchase_date", "retailer", "category", "category_spend_total", ] RETAILER_STORE_FIELDS = [ "retailer", "store_name", "store_number", "store_city", "store_state", "visit_count", "item_row_count", "store_spend_total", ] def effective_total(row): total = to_decimal(row.get("net_line_total")) if total is not None: return total return to_decimal(row.get("line_total")) def is_item_row(row): return ( row.get("is_fee") != "true" and row.get("is_discount_line") != "true" and row.get("is_coupon_line") != "true" ) def build_item_price_rows(purchase_rows): rows = [] for row in purchase_rows: if not row.get("catalog_name") or not row.get("effective_price"): continue rows.append( { "purchase_date": row.get("purchase_date", ""), "retailer": row.get("retailer", ""), "store_name": row.get("store_name", ""), "store_number": row.get("store_number", ""), "store_city": row.get("store_city", ""), "store_state": row.get("store_state", ""), "order_id": row.get("order_id", ""), "catalog_id": row.get("catalog_id", ""), "catalog_name": row.get("catalog_name", ""), "category": row.get("category", ""), "product_type": row.get("product_type", ""), "effective_price": row.get("effective_price", ""), "effective_price_unit": row.get("effective_price_unit", ""), "net_line_total": row.get("net_line_total", ""), "normalized_quantity": row.get("normalized_quantity", ""), } ) return rows def build_spend_by_visit_rows(purchase_rows): grouped = defaultdict(lambda: {"total": to_decimal("0")}) for row in purchase_rows: total = effective_total(row) if total is None: continue key = ( row.get("purchase_date", ""), row.get("retailer", ""), row.get("order_id", ""), row.get("store_name", ""), row.get("store_number", ""), row.get("store_city", ""), row.get("store_state", ""), ) grouped[key]["total"] += total rows = [] for key, values in sorted(grouped.items()): rows.append( { "purchase_date": key[0], "retailer": key[1], "order_id": key[2], "store_name": key[3], "store_number": key[4], "store_city": key[5], "store_state": key[6], "visit_spend_total": format_decimal(values["total"]), } ) return rows def build_items_per_visit_rows(purchase_rows): grouped = defaultdict(lambda: {"item_rows": 0, "catalog_ids": set()}) for row in purchase_rows: if not is_item_row(row): continue key = ( row.get("purchase_date", ""), row.get("retailer", ""), row.get("order_id", ""), row.get("store_name", ""), row.get("store_number", ""), row.get("store_city", ""), row.get("store_state", ""), ) grouped[key]["item_rows"] += 1 if row.get("catalog_id"): grouped[key]["catalog_ids"].add(row["catalog_id"]) rows = [] for key, values in sorted(grouped.items()): rows.append( { "purchase_date": key[0], "retailer": key[1], "order_id": key[2], "store_name": key[3], "store_number": key[4], "store_city": key[5], "store_state": key[6], "item_row_count": str(values["item_rows"]), "distinct_catalog_count": str(len(values["catalog_ids"])), } ) return rows def build_category_spend_rows(purchase_rows): grouped = defaultdict(lambda: to_decimal("0")) for row in purchase_rows: category = row.get("category", "") total = effective_total(row) if not category or total is None: continue key = ( row.get("purchase_date", ""), row.get("retailer", ""), category, ) grouped[key] += total rows = [] for key, total in sorted(grouped.items()): rows.append( { "purchase_date": key[0], "retailer": key[1], "category": key[2], "category_spend_total": format_decimal(total), } ) return rows def build_retailer_store_rows(purchase_rows): grouped = defaultdict(lambda: {"visit_ids": set(), "item_rows": 0, "total": to_decimal("0")}) for row in purchase_rows: total = effective_total(row) key = ( row.get("retailer", ""), row.get("store_name", ""), row.get("store_number", ""), row.get("store_city", ""), row.get("store_state", ""), ) grouped[key]["visit_ids"].add((row.get("purchase_date", ""), row.get("order_id", ""))) if is_item_row(row): grouped[key]["item_rows"] += 1 if total is not None: grouped[key]["total"] += total rows = [] for key, values in sorted(grouped.items()): rows.append( { "retailer": key[0], "store_name": key[1], "store_number": key[2], "store_city": key[3], "store_state": key[4], "visit_count": str(len(values["visit_ids"])), "item_row_count": str(values["item_rows"]), "store_spend_total": format_decimal(values["total"]), } ) return rows @click.command() @click.option("--purchases-csv", default="data/analysis/purchases.csv", show_default=True) @click.option("--output-dir", default="data/analysis", show_default=True) def main(purchases_csv, output_dir): purchase_rows = read_csv_rows(purchases_csv) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) item_price_rows = build_item_price_rows(purchase_rows) spend_by_visit_rows = build_spend_by_visit_rows(purchase_rows) items_per_visit_rows = build_items_per_visit_rows(purchase_rows) category_spend_rows = build_category_spend_rows(purchase_rows) retailer_store_rows = build_retailer_store_rows(purchase_rows) outputs = [ ("item_price_over_time.csv", item_price_rows, ITEM_PRICE_FIELDS), ("spend_by_visit.csv", spend_by_visit_rows, SPEND_BY_VISIT_FIELDS), ("items_per_visit.csv", items_per_visit_rows, ITEMS_PER_VISIT_FIELDS), ("category_spend_over_time.csv", category_spend_rows, CATEGORY_SPEND_FIELDS), ("retailer_store_breakdown.csv", retailer_store_rows, RETAILER_STORE_FIELDS), ] for filename, rows, fieldnames in outputs: write_csv_rows(output_path / filename, rows, fieldnames) click.echo(f"wrote analysis outputs to {output_path}") if __name__ == "__main__": main()