Files
scrape-giant/enrich_costco.py

331 lines
11 KiB
Python

import csv
import json
import re
from collections import defaultdict
from pathlib import Path
import click
from enrich_giant import (
OUTPUT_FIELDS,
format_decimal,
normalize_number,
normalize_unit,
normalize_whitespace,
singularize_tokens,
to_decimal,
)
PARSER_VERSION = "costco-enrich-v1"
RETAILER = "costco"
DEFAULT_INPUT_DIR = Path("costco_output/raw")
DEFAULT_OUTPUT_CSV = Path("costco_output/items_enriched.csv")
CODE_TOKEN_RE = re.compile(
r"\b(?:SL\d+|T\d+H\d+|P\d+(?:/\d+)?|W\d+T\d+H\d+|FY\d+|CSPC#|C\d+T\d+H\d+|EC\d+T\d+H\d+|\d+X\d+)\b"
)
PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b")
HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b")
PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b")
PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b")
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G)\b")
DISCOUNT_TARGET_RE = re.compile(r"^/\s*(\d+)\b")
def clean_costco_name(name):
cleaned = normalize_whitespace(name).upper().replace('"', "")
cleaned = CODE_TOKEN_RE.sub(" ", cleaned)
cleaned = re.sub(r"\s*/\s*\d+(?:\.\d+)?\s*(KG|G)\b", " ", cleaned)
cleaned = normalize_whitespace(cleaned)
return cleaned
def combine_description(item):
return normalize_whitespace(
" ".join(
str(part).strip()
for part in [item.get("itemDescription01"), item.get("itemDescription02")]
if part
)
)
def parse_costco_size_and_pack(cleaned_name):
pack_qty = ""
size_value = ""
size_unit = ""
match = PACK_FRACTION_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
size_value = normalize_number(match.group(2))
size_unit = normalize_unit(match.group(3))
return size_value, size_unit, pack_qty
match = HASH_SIZE_RE.search(cleaned_name)
if match:
size_value = normalize_number(match.group(1))
size_unit = "lb"
match = PACK_DASH_RE.search(cleaned_name) or PACK_WORD_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
matches = list(SIZE_RE.finditer(cleaned_name))
if matches:
last = matches[-1]
unit = last.group(2)
size_value = normalize_number(last.group(1))
size_unit = "count" if unit == "CT" else normalize_unit(unit)
return size_value, size_unit, pack_qty
def normalize_costco_name(cleaned_name):
brand = ""
base = cleaned_name
if base.startswith("KS "):
brand = "KS"
base = normalize_whitespace(base[3:])
size_value, size_unit, pack_qty = parse_costco_size_and_pack(base)
if size_value and size_unit:
if pack_qty:
base = PACK_FRACTION_RE.sub(" ", base)
else:
base = SIZE_RE.sub(" ", base)
base = HASH_SIZE_RE.sub(" ", base)
base = PACK_DASH_RE.sub(" ", base)
base = PACK_WORD_RE.sub(" ", base)
base = normalize_whitespace(base)
tokens = []
for token in base.split():
if token in {"ORG"}:
continue
if token in {"PEANUT", "BUTTER"} and "JIF" in base:
continue
tokens.append(token)
base = singularize_tokens(" ".join(tokens))
return normalize_whitespace(base), brand, size_value, size_unit, pack_qty
def guess_measure_type(size_unit, pack_qty, is_discount_line):
if is_discount_line:
return "each"
if size_unit in {"lb", "oz", "g", "kg"}:
return "weight"
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
return "volume"
if size_unit == "count" or pack_qty:
return "count"
return "each"
def derive_costco_prices(item, measure_type, size_value, size_unit, pack_qty):
line_total = to_decimal(item.get("amount"))
qty = to_decimal(item.get("unit"))
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or 1
price_per_each = ""
price_per_lb = ""
price_per_oz = ""
if line_total is None:
return price_per_each, price_per_lb, price_per_oz
if measure_type in {"each", "count"} and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
if parsed_size not in (None, 0):
total_units = parsed_size * parsed_pack * (qty or 1)
if size_unit == "lb":
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / 16)
elif size_unit == "oz":
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * 16)
return price_per_each, price_per_lb, price_per_oz
def is_discount_item(item):
amount = to_decimal(item.get("amount")) or 0
unit = to_decimal(item.get("unit")) or 0
description = combine_description(item)
return amount < 0 or unit < 0 or description.startswith("/")
def discount_target_id(raw_name):
match = DISCOUNT_TARGET_RE.match(normalize_whitespace(raw_name))
if not match:
return ""
return match.group(1)
def parse_costco_item(order_id, order_date, raw_path, line_no, item):
raw_name = combine_description(item)
cleaned_name = clean_costco_name(raw_name)
item_name_norm, brand_guess, size_value, size_unit, pack_qty = normalize_costco_name(
cleaned_name
)
is_discount_line = is_discount_item(item)
is_coupon_line = "true" if raw_name.startswith("/") else "false"
measure_type = guess_measure_type(size_unit, pack_qty, is_discount_line)
price_per_each, price_per_lb, price_per_oz = derive_costco_prices(
item, measure_type, size_value, size_unit, pack_qty
)
return {
"retailer": RETAILER,
"order_id": str(order_id),
"line_no": str(line_no),
"observed_item_key": f"{RETAILER}:{order_id}:{line_no}",
"order_date": normalize_whitespace(order_date),
"retailer_item_id": str(item.get("itemNumber", "")),
"pod_id": "",
"item_name": raw_name,
"upc": "",
"category_id": str(item.get("itemDepartmentNumber", "")),
"category": str(item.get("transDepartmentNumber", "")),
"qty": str(item.get("unit", "")),
"unit": str(item.get("itemIdentifier", "")),
"unit_price": str(item.get("itemUnitPriceAmount", "")),
"line_total": str(item.get("amount", "")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": str(item.get("amount", "")) if is_discount_line else "",
"coupon_price": "",
"matched_discount_amount": "",
"net_line_total": str(item.get("amount", "")) if not is_discount_line else "",
"image_url": "",
"raw_order_path": raw_path.as_posix(),
"item_name_norm": item_name_norm,
"brand_guess": brand_guess,
"variant": "",
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
"measure_type": measure_type,
"is_store_brand": "true" if brand_guess else "false",
"is_fee": "false",
"is_discount_line": "true" if is_discount_line else "false",
"is_coupon_line": is_coupon_line,
"price_per_each": price_per_each,
"price_per_lb": price_per_lb,
"price_per_oz": price_per_oz,
"parse_version": PARSER_VERSION,
"parse_notes": "",
}
def match_costco_discounts(rows):
rows_by_order = defaultdict(list)
for row in rows:
rows_by_order[row["order_id"]].append(row)
for order_rows in rows_by_order.values():
purchase_rows_by_item_id = defaultdict(list)
for row in order_rows:
if row.get("is_discount_line") == "true":
continue
retailer_item_id = row.get("retailer_item_id", "")
if retailer_item_id:
purchase_rows_by_item_id[retailer_item_id].append(row)
for row in order_rows:
if row.get("is_discount_line") != "true":
continue
target_id = discount_target_id(row.get("item_name", ""))
if not target_id:
continue
matches = purchase_rows_by_item_id.get(target_id, [])
if len(matches) != 1:
row["parse_notes"] = normalize_whitespace(
f"{row.get('parse_notes', '')};discount_target_unmatched={target_id}"
).strip(";")
continue
purchase_row = matches[0]
matched_discount = to_decimal(row.get("line_total"))
gross_total = to_decimal(purchase_row.get("line_total"))
existing_discount = to_decimal(purchase_row.get("matched_discount_amount")) or 0
if matched_discount is None or gross_total is None:
continue
total_discount = existing_discount + matched_discount
purchase_row["matched_discount_amount"] = format_decimal(total_discount)
purchase_row["net_line_total"] = format_decimal(gross_total + total_discount)
purchase_row["parse_notes"] = normalize_whitespace(
f"{purchase_row.get('parse_notes', '')};matched_discount={target_id}"
).strip(";")
row["parse_notes"] = normalize_whitespace(
f"{row.get('parse_notes', '')};matched_to_item={target_id}"
).strip(";")
def iter_costco_rows(raw_dir):
for path in discover_json_files(raw_dir):
if path.name in {"summary.json", "summary_requests.json"}:
continue
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
continue
receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
for receipt in receipts:
order_id = receipt["transactionBarcode"]
order_date = receipt.get("transactionDate", "")
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
yield parse_costco_item(order_id, order_date, path, line_no, item)
def discover_json_files(raw_dir):
raw_dir = Path(raw_dir)
candidates = sorted(raw_dir.glob("*.json"))
if candidates:
return candidates
if raw_dir.name == "raw" and raw_dir.parent.exists():
return sorted(raw_dir.parent.glob("*.json"))
return []
def build_items_enriched(raw_dir):
rows = list(iter_costco_rows(raw_dir))
match_costco_discounts(rows)
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
return rows
def write_csv(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--input-dir",
default=str(DEFAULT_INPUT_DIR),
show_default=True,
help="Directory containing Costco raw order json files.",
)
@click.option(
"--output-csv",
default=str(DEFAULT_OUTPUT_CSV),
show_default=True,
help="CSV path for enriched Costco item rows.",
)
def main(input_dir, output_csv):
rows = build_items_enriched(Path(input_dir))
write_csv(Path(output_csv), rows)
click.echo(f"wrote {len(rows)} rows to {output_csv}")
if __name__ == "__main__":
main()