Files
scrape-giant/enrich_costco.py

380 lines
13 KiB
Python

import csv
import json
import re
from collections import defaultdict
from pathlib import Path
import click
from enrich_giant import (
OUTPUT_FIELDS,
derive_normalized_quantity,
derive_price_fields,
format_decimal,
normalization_identity,
normalize_number,
normalize_unit,
normalize_whitespace,
singularize_tokens,
to_decimal,
)
PARSER_VERSION = "costco-enrich-v1"
RETAILER = "costco"
DEFAULT_INPUT_DIR = Path("costco_output/raw")
DEFAULT_OUTPUT_CSV = Path("costco_output/items_enriched.csv")
CODE_TOKEN_RE = re.compile(
r"\b(?:SL\d+|T\d+H\d+|P\d+(?:/\d+)?|W\d+T\d+H\d+|FY\d+|CSPC#|C\d+T\d+H\d+|EC\d+T\d+H\d+|\d+X\d+)\b"
)
PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b")
HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b")
ITEM_CODE_RE = re.compile(r"#\w+\b")
DUAL_WEIGHT_RE = re.compile(
r"\b\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\s*/\s*\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\b"
)
LOGISTICS_SLASH_RE = re.compile(r"\b(?:T\d+/H\d+(?:/P\d+)?/?|H\d+/P\d+/?|T\d+/H\d+/?)\b")
PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b")
PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b")
SIZE_RE = re.compile(
r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G|QT|QTS|PT|PTS|GAL|GALS|FL OZ|FLOZ)\b"
)
DISCOUNT_TARGET_RE = re.compile(r"^/\s*(\d+)\b")
def clean_costco_name(name):
cleaned = normalize_whitespace(name).upper().replace('"', "")
cleaned = CODE_TOKEN_RE.sub(" ", cleaned)
cleaned = re.sub(r"\s*/\s*\d+(?:\.\d+)?\s*(KG|G)\b", " ", cleaned)
cleaned = normalize_whitespace(cleaned)
return cleaned
def combine_description(item):
return normalize_whitespace(
" ".join(
str(part).strip()
for part in [item.get("itemDescription01"), item.get("itemDescription02")]
if part
)
)
def parse_costco_size_and_pack(cleaned_name):
pack_qty = ""
size_value = ""
size_unit = ""
match = PACK_FRACTION_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
size_value = normalize_number(match.group(2))
size_unit = normalize_unit(match.group(3))
return size_value, size_unit, pack_qty
match = HASH_SIZE_RE.search(cleaned_name)
if match:
size_value = normalize_number(match.group(1))
size_unit = "lb"
match = PACK_DASH_RE.search(cleaned_name) or PACK_WORD_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
matches = list(SIZE_RE.finditer(cleaned_name))
if matches:
last = matches[-1]
unit = last.group(2)
size_value = normalize_number(last.group(1))
size_unit = "count" if unit == "CT" else normalize_unit(unit)
return size_value, size_unit, pack_qty
def normalize_costco_name(cleaned_name):
brand = ""
base = cleaned_name
if base.startswith("KS "):
brand = "KS"
base = normalize_whitespace(base[3:])
size_value, size_unit, pack_qty = parse_costco_size_and_pack(base)
if size_value and size_unit:
if pack_qty:
base = PACK_FRACTION_RE.sub(" ", base)
else:
base = SIZE_RE.sub(" ", base)
base = DUAL_WEIGHT_RE.sub(" ", base)
base = HASH_SIZE_RE.sub(" ", base)
base = ITEM_CODE_RE.sub(" ", base)
base = LOGISTICS_SLASH_RE.sub(" ", base)
base = PACK_DASH_RE.sub(" ", base)
base = PACK_WORD_RE.sub(" ", base)
base = normalize_whitespace(base)
tokens = []
for token in base.split():
if token in {"/", "-"}:
continue
if token in {"ORG"}:
continue
if token in {"PEANUT", "BUTTER"} and "JIF" in base:
continue
tokens.append(token)
base = singularize_tokens(" ".join(tokens))
return normalize_whitespace(base), brand, size_value, size_unit, pack_qty
def guess_measure_type(size_unit, pack_qty, is_discount_line):
if is_discount_line:
return "each"
if size_unit in {"lb", "oz", "g", "kg"}:
return "weight"
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
return "volume"
if size_unit == "count" or pack_qty:
return "count"
return "each"
def derive_costco_prices(item, measure_type, size_value, size_unit, pack_qty):
line_total = to_decimal(item.get("amount"))
qty = to_decimal(item.get("unit"))
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or 1
price_per_each = ""
price_per_lb = ""
price_per_oz = ""
if line_total is None:
return price_per_each, price_per_lb, price_per_oz
if measure_type in {"each", "count"} and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
if parsed_size not in (None, 0):
total_units = parsed_size * parsed_pack * (qty or 1)
if size_unit == "lb":
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / 16)
elif size_unit == "oz":
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * 16)
return price_per_each, price_per_lb, price_per_oz
def is_discount_item(item):
amount = to_decimal(item.get("amount")) or 0
unit = to_decimal(item.get("unit")) or 0
description = combine_description(item)
return amount < 0 or unit < 0 or description.startswith("/")
def discount_target_id(raw_name):
match = DISCOUNT_TARGET_RE.match(normalize_whitespace(raw_name))
if not match:
return ""
return match.group(1)
def parse_costco_item(order_id, order_date, raw_path, line_no, item):
raw_name = combine_description(item)
cleaned_name = clean_costco_name(raw_name)
item_name_norm, brand_guess, size_value, size_unit, pack_qty = normalize_costco_name(
cleaned_name
)
is_discount_line = is_discount_item(item)
is_coupon_line = "true" if raw_name.startswith("/") else "false"
measure_type = guess_measure_type(size_unit, pack_qty, is_discount_line)
price_per_each, price_per_lb, price_per_oz = derive_costco_prices(
item, measure_type, size_value, size_unit, pack_qty
)
normalized_row_id = f"{RETAILER}:{order_id}:{line_no}"
normalized_quantity, normalized_quantity_unit = derive_normalized_quantity(
item.get("unit"),
size_value,
size_unit,
pack_qty,
measure_type,
"",
)
identity_key, normalization_basis = normalization_identity(
{
"retailer": RETAILER,
"normalized_row_id": normalized_row_id,
"upc": "",
"retailer_item_id": str(item.get("itemNumber", "")),
"item_name_norm": item_name_norm,
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
}
)
price_fields = derive_price_fields(
price_per_each,
price_per_lb,
price_per_oz,
str(item.get("amount", "")),
str(item.get("unit", "")),
pack_qty,
)
return {
"retailer": RETAILER,
"order_id": str(order_id),
"line_no": str(line_no),
"normalized_row_id": normalized_row_id,
"normalized_item_id": f"cnorm:{identity_key}",
"normalization_basis": normalization_basis,
"observed_item_key": normalized_row_id,
"order_date": normalize_whitespace(order_date),
"retailer_item_id": str(item.get("itemNumber", "")),
"pod_id": "",
"item_name": raw_name,
"upc": "",
"category_id": str(item.get("itemDepartmentNumber", "")),
"category": str(item.get("transDepartmentNumber", "")),
"qty": str(item.get("unit", "")),
"unit": str(item.get("itemIdentifier", "")),
"unit_price": str(item.get("itemUnitPriceAmount", "")),
"line_total": str(item.get("amount", "")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": str(item.get("amount", "")) if is_discount_line else "",
"coupon_price": "",
"matched_discount_amount": "",
"net_line_total": str(item.get("amount", "")) if not is_discount_line else "",
"image_url": "",
"raw_order_path": raw_path.as_posix(),
"item_name_norm": item_name_norm,
"brand_guess": brand_guess,
"variant": "",
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
"measure_type": measure_type,
"normalized_quantity": normalized_quantity,
"normalized_quantity_unit": normalized_quantity_unit,
"is_store_brand": "true" if brand_guess else "false",
"is_item": "false" if is_discount_line else "true",
"is_fee": "false",
"is_discount_line": "true" if is_discount_line else "false",
"is_coupon_line": is_coupon_line,
**price_fields,
"parse_version": PARSER_VERSION,
"parse_notes": "",
}
def match_costco_discounts(rows):
rows_by_order = defaultdict(list)
for row in rows:
rows_by_order[row["order_id"]].append(row)
for order_rows in rows_by_order.values():
purchase_rows_by_item_id = defaultdict(list)
for row in order_rows:
if row.get("is_discount_line") == "true":
continue
retailer_item_id = row.get("retailer_item_id", "")
if retailer_item_id:
purchase_rows_by_item_id[retailer_item_id].append(row)
for row in order_rows:
if row.get("is_discount_line") != "true":
continue
target_id = discount_target_id(row.get("item_name", ""))
if not target_id:
continue
matches = purchase_rows_by_item_id.get(target_id, [])
if len(matches) != 1:
row["parse_notes"] = normalize_whitespace(
f"{row.get('parse_notes', '')};discount_target_unmatched={target_id}"
).strip(";")
continue
purchase_row = matches[0]
matched_discount = to_decimal(row.get("line_total"))
gross_total = to_decimal(purchase_row.get("line_total"))
existing_discount = to_decimal(purchase_row.get("matched_discount_amount")) or 0
if matched_discount is None or gross_total is None:
continue
total_discount = existing_discount + matched_discount
purchase_row["matched_discount_amount"] = format_decimal(total_discount)
purchase_row["net_line_total"] = format_decimal(gross_total + total_discount)
purchase_row["parse_notes"] = normalize_whitespace(
f"{purchase_row.get('parse_notes', '')};matched_discount={target_id}"
).strip(";")
row["parse_notes"] = normalize_whitespace(
f"{row.get('parse_notes', '')};matched_to_item={target_id}"
).strip(";")
def iter_costco_rows(raw_dir):
for path in discover_json_files(raw_dir):
if path.name in {"summary.json", "summary_requests.json"}:
continue
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
continue
receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
for receipt in receipts:
order_id = receipt["transactionBarcode"]
order_date = receipt.get("transactionDate", "")
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
yield parse_costco_item(order_id, order_date, path, line_no, item)
def discover_json_files(raw_dir):
raw_dir = Path(raw_dir)
candidates = sorted(raw_dir.glob("*.json"))
if candidates:
return candidates
if raw_dir.name == "raw" and raw_dir.parent.exists():
return sorted(raw_dir.parent.glob("*.json"))
return []
def build_items_enriched(raw_dir):
rows = list(iter_costco_rows(raw_dir))
match_costco_discounts(rows)
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
return rows
def write_csv(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--input-dir",
default=str(DEFAULT_INPUT_DIR),
show_default=True,
help="Directory containing Costco raw order json files.",
)
@click.option(
"--output-csv",
default=str(DEFAULT_OUTPUT_CSV),
show_default=True,
help="CSV path for enriched Costco item rows.",
)
def main(input_dir, output_csv):
click.echo("legacy entrypoint: prefer normalize_costco_web.py for data-model outputs")
rows = build_items_enriched(Path(input_dir))
write_csv(Path(output_csv), rows)
click.echo(f"wrote {len(rows)} rows to {output_csv}")
if __name__ == "__main__":
main()