379 lines
13 KiB
Python
379 lines
13 KiB
Python
import csv
|
|
import json
|
|
import re
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
import click
|
|
|
|
from enrich_giant import (
|
|
OUTPUT_FIELDS,
|
|
derive_normalized_quantity,
|
|
derive_price_fields,
|
|
format_decimal,
|
|
normalization_identity,
|
|
normalize_number,
|
|
normalize_unit,
|
|
normalize_whitespace,
|
|
singularize_tokens,
|
|
to_decimal,
|
|
)
|
|
|
|
|
|
PARSER_VERSION = "costco-enrich-v1"
|
|
RETAILER = "costco"
|
|
DEFAULT_INPUT_DIR = Path("costco_output/raw")
|
|
DEFAULT_OUTPUT_CSV = Path("costco_output/items_enriched.csv")
|
|
|
|
CODE_TOKEN_RE = re.compile(
|
|
r"\b(?:SL\d+|T\d+H\d+|P\d+(?:/\d+)?|W\d+T\d+H\d+|FY\d+|CSPC#|C\d+T\d+H\d+|EC\d+T\d+H\d+|\d+X\d+)\b"
|
|
)
|
|
PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b")
|
|
HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b")
|
|
ITEM_CODE_RE = re.compile(r"#\w+\b")
|
|
DUAL_WEIGHT_RE = re.compile(
|
|
r"\b\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\s*/\s*\d+(?:\.\d+)?\s*(?:KG|G|LB|LBS|OZ)\b"
|
|
)
|
|
LOGISTICS_SLASH_RE = re.compile(r"\b(?:T\d+/H\d+(?:/P\d+)?/?|H\d+/P\d+/?|T\d+/H\d+/?)\b")
|
|
PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b")
|
|
PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b")
|
|
SIZE_RE = re.compile(
|
|
r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G|QT|QTS|PT|PTS|GAL|GALS|FL OZ|FLOZ)\b"
|
|
)
|
|
DISCOUNT_TARGET_RE = re.compile(r"^/\s*(\d+)\b")
|
|
|
|
|
|
def clean_costco_name(name):
|
|
cleaned = normalize_whitespace(name).upper().replace('"', "")
|
|
cleaned = CODE_TOKEN_RE.sub(" ", cleaned)
|
|
cleaned = re.sub(r"\s*/\s*\d+(?:\.\d+)?\s*(KG|G)\b", " ", cleaned)
|
|
cleaned = normalize_whitespace(cleaned)
|
|
return cleaned
|
|
|
|
|
|
def combine_description(item):
|
|
return normalize_whitespace(
|
|
" ".join(
|
|
str(part).strip()
|
|
for part in [item.get("itemDescription01"), item.get("itemDescription02")]
|
|
if part
|
|
)
|
|
)
|
|
|
|
|
|
def parse_costco_size_and_pack(cleaned_name):
|
|
pack_qty = ""
|
|
size_value = ""
|
|
size_unit = ""
|
|
|
|
match = PACK_FRACTION_RE.search(cleaned_name)
|
|
if match:
|
|
pack_qty = normalize_number(match.group(1))
|
|
size_value = normalize_number(match.group(2))
|
|
size_unit = normalize_unit(match.group(3))
|
|
return size_value, size_unit, pack_qty
|
|
|
|
match = HASH_SIZE_RE.search(cleaned_name)
|
|
if match:
|
|
size_value = normalize_number(match.group(1))
|
|
size_unit = "lb"
|
|
|
|
match = PACK_DASH_RE.search(cleaned_name) or PACK_WORD_RE.search(cleaned_name)
|
|
if match:
|
|
pack_qty = normalize_number(match.group(1))
|
|
|
|
matches = list(SIZE_RE.finditer(cleaned_name))
|
|
if matches:
|
|
last = matches[-1]
|
|
unit = last.group(2)
|
|
size_value = normalize_number(last.group(1))
|
|
size_unit = "count" if unit == "CT" else normalize_unit(unit)
|
|
|
|
return size_value, size_unit, pack_qty
|
|
|
|
|
|
def normalize_costco_name(cleaned_name):
|
|
brand = ""
|
|
base = cleaned_name
|
|
if base.startswith("KS "):
|
|
brand = "KS"
|
|
base = normalize_whitespace(base[3:])
|
|
|
|
size_value, size_unit, pack_qty = parse_costco_size_and_pack(base)
|
|
if size_value and size_unit:
|
|
if pack_qty:
|
|
base = PACK_FRACTION_RE.sub(" ", base)
|
|
else:
|
|
base = SIZE_RE.sub(" ", base)
|
|
base = DUAL_WEIGHT_RE.sub(" ", base)
|
|
base = HASH_SIZE_RE.sub(" ", base)
|
|
base = ITEM_CODE_RE.sub(" ", base)
|
|
base = LOGISTICS_SLASH_RE.sub(" ", base)
|
|
base = PACK_DASH_RE.sub(" ", base)
|
|
base = PACK_WORD_RE.sub(" ", base)
|
|
base = normalize_whitespace(base)
|
|
tokens = []
|
|
for token in base.split():
|
|
if token in {"/", "-"}:
|
|
continue
|
|
if token in {"ORG"}:
|
|
continue
|
|
if token in {"PEANUT", "BUTTER"} and "JIF" in base:
|
|
continue
|
|
tokens.append(token)
|
|
base = singularize_tokens(" ".join(tokens))
|
|
return normalize_whitespace(base), brand, size_value, size_unit, pack_qty
|
|
|
|
|
|
def guess_measure_type(size_unit, pack_qty, is_discount_line):
|
|
if is_discount_line:
|
|
return "each"
|
|
if size_unit in {"lb", "oz", "g", "kg"}:
|
|
return "weight"
|
|
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
|
|
return "volume"
|
|
if size_unit == "count" or pack_qty:
|
|
return "count"
|
|
return "each"
|
|
|
|
|
|
def derive_costco_prices(item, measure_type, size_value, size_unit, pack_qty):
|
|
line_total = to_decimal(item.get("amount"))
|
|
qty = to_decimal(item.get("unit"))
|
|
parsed_size = to_decimal(size_value)
|
|
parsed_pack = to_decimal(pack_qty) or 1
|
|
|
|
price_per_each = ""
|
|
price_per_lb = ""
|
|
price_per_oz = ""
|
|
if line_total is None:
|
|
return price_per_each, price_per_lb, price_per_oz
|
|
|
|
if measure_type in {"each", "count"} and qty not in (None, 0):
|
|
price_per_each = format_decimal(line_total / qty)
|
|
|
|
if parsed_size not in (None, 0):
|
|
total_units = parsed_size * parsed_pack * (qty or 1)
|
|
if size_unit == "lb":
|
|
per_lb = line_total / total_units
|
|
price_per_lb = format_decimal(per_lb)
|
|
price_per_oz = format_decimal(per_lb / 16)
|
|
elif size_unit == "oz":
|
|
per_oz = line_total / total_units
|
|
price_per_oz = format_decimal(per_oz)
|
|
price_per_lb = format_decimal(per_oz * 16)
|
|
|
|
return price_per_each, price_per_lb, price_per_oz
|
|
|
|
|
|
def is_discount_item(item):
|
|
amount = to_decimal(item.get("amount")) or 0
|
|
unit = to_decimal(item.get("unit")) or 0
|
|
description = combine_description(item)
|
|
return amount < 0 or unit < 0 or description.startswith("/")
|
|
|
|
|
|
def discount_target_id(raw_name):
|
|
match = DISCOUNT_TARGET_RE.match(normalize_whitespace(raw_name))
|
|
if not match:
|
|
return ""
|
|
return match.group(1)
|
|
|
|
|
|
def parse_costco_item(order_id, order_date, raw_path, line_no, item):
|
|
raw_name = combine_description(item)
|
|
cleaned_name = clean_costco_name(raw_name)
|
|
item_name_norm, brand_guess, size_value, size_unit, pack_qty = normalize_costco_name(
|
|
cleaned_name
|
|
)
|
|
is_discount_line = is_discount_item(item)
|
|
is_coupon_line = "true" if raw_name.startswith("/") else "false"
|
|
measure_type = guess_measure_type(size_unit, pack_qty, is_discount_line)
|
|
price_per_each, price_per_lb, price_per_oz = derive_costco_prices(
|
|
item, measure_type, size_value, size_unit, pack_qty
|
|
)
|
|
normalized_row_id = f"{RETAILER}:{order_id}:{line_no}"
|
|
normalized_quantity, normalized_quantity_unit = derive_normalized_quantity(
|
|
item.get("unit"),
|
|
size_value,
|
|
size_unit,
|
|
pack_qty,
|
|
measure_type,
|
|
)
|
|
identity_key, normalization_basis = normalization_identity(
|
|
{
|
|
"retailer": RETAILER,
|
|
"normalized_row_id": normalized_row_id,
|
|
"upc": "",
|
|
"retailer_item_id": str(item.get("itemNumber", "")),
|
|
"item_name_norm": item_name_norm,
|
|
"size_value": size_value,
|
|
"size_unit": size_unit,
|
|
"pack_qty": pack_qty,
|
|
}
|
|
)
|
|
price_fields = derive_price_fields(
|
|
price_per_each,
|
|
price_per_lb,
|
|
price_per_oz,
|
|
str(item.get("amount", "")),
|
|
str(item.get("unit", "")),
|
|
pack_qty,
|
|
)
|
|
|
|
return {
|
|
"retailer": RETAILER,
|
|
"order_id": str(order_id),
|
|
"line_no": str(line_no),
|
|
"normalized_row_id": normalized_row_id,
|
|
"normalized_item_id": f"cnorm:{identity_key}",
|
|
"normalization_basis": normalization_basis,
|
|
"observed_item_key": normalized_row_id,
|
|
"order_date": normalize_whitespace(order_date),
|
|
"retailer_item_id": str(item.get("itemNumber", "")),
|
|
"pod_id": "",
|
|
"item_name": raw_name,
|
|
"upc": "",
|
|
"category_id": str(item.get("itemDepartmentNumber", "")),
|
|
"category": str(item.get("transDepartmentNumber", "")),
|
|
"qty": str(item.get("unit", "")),
|
|
"unit": str(item.get("itemIdentifier", "")),
|
|
"unit_price": str(item.get("itemUnitPriceAmount", "")),
|
|
"line_total": str(item.get("amount", "")),
|
|
"picked_weight": "",
|
|
"mvp_savings": "",
|
|
"reward_savings": "",
|
|
"coupon_savings": str(item.get("amount", "")) if is_discount_line else "",
|
|
"coupon_price": "",
|
|
"matched_discount_amount": "",
|
|
"net_line_total": str(item.get("amount", "")) if not is_discount_line else "",
|
|
"image_url": "",
|
|
"raw_order_path": raw_path.as_posix(),
|
|
"item_name_norm": item_name_norm,
|
|
"brand_guess": brand_guess,
|
|
"variant": "",
|
|
"size_value": size_value,
|
|
"size_unit": size_unit,
|
|
"pack_qty": pack_qty,
|
|
"measure_type": measure_type,
|
|
"normalized_quantity": normalized_quantity,
|
|
"normalized_quantity_unit": normalized_quantity_unit,
|
|
"is_store_brand": "true" if brand_guess else "false",
|
|
"is_item": "false" if is_discount_line else "true",
|
|
"is_fee": "false",
|
|
"is_discount_line": "true" if is_discount_line else "false",
|
|
"is_coupon_line": is_coupon_line,
|
|
**price_fields,
|
|
"parse_version": PARSER_VERSION,
|
|
"parse_notes": "",
|
|
}
|
|
|
|
|
|
def match_costco_discounts(rows):
|
|
rows_by_order = defaultdict(list)
|
|
for row in rows:
|
|
rows_by_order[row["order_id"]].append(row)
|
|
|
|
for order_rows in rows_by_order.values():
|
|
purchase_rows_by_item_id = defaultdict(list)
|
|
for row in order_rows:
|
|
if row.get("is_discount_line") == "true":
|
|
continue
|
|
retailer_item_id = row.get("retailer_item_id", "")
|
|
if retailer_item_id:
|
|
purchase_rows_by_item_id[retailer_item_id].append(row)
|
|
|
|
for row in order_rows:
|
|
if row.get("is_discount_line") != "true":
|
|
continue
|
|
target_id = discount_target_id(row.get("item_name", ""))
|
|
if not target_id:
|
|
continue
|
|
matches = purchase_rows_by_item_id.get(target_id, [])
|
|
if len(matches) != 1:
|
|
row["parse_notes"] = normalize_whitespace(
|
|
f"{row.get('parse_notes', '')};discount_target_unmatched={target_id}"
|
|
).strip(";")
|
|
continue
|
|
|
|
purchase_row = matches[0]
|
|
matched_discount = to_decimal(row.get("line_total"))
|
|
gross_total = to_decimal(purchase_row.get("line_total"))
|
|
existing_discount = to_decimal(purchase_row.get("matched_discount_amount")) or 0
|
|
if matched_discount is None or gross_total is None:
|
|
continue
|
|
|
|
total_discount = existing_discount + matched_discount
|
|
purchase_row["matched_discount_amount"] = format_decimal(total_discount)
|
|
purchase_row["net_line_total"] = format_decimal(gross_total + total_discount)
|
|
purchase_row["parse_notes"] = normalize_whitespace(
|
|
f"{purchase_row.get('parse_notes', '')};matched_discount={target_id}"
|
|
).strip(";")
|
|
row["parse_notes"] = normalize_whitespace(
|
|
f"{row.get('parse_notes', '')};matched_to_item={target_id}"
|
|
).strip(";")
|
|
|
|
|
|
def iter_costco_rows(raw_dir):
|
|
for path in discover_json_files(raw_dir):
|
|
if path.name in {"summary.json", "summary_requests.json"}:
|
|
continue
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
|
|
for receipt in receipts:
|
|
order_id = receipt["transactionBarcode"]
|
|
order_date = receipt.get("transactionDate", "")
|
|
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
|
|
yield parse_costco_item(order_id, order_date, path, line_no, item)
|
|
|
|
|
|
def discover_json_files(raw_dir):
|
|
raw_dir = Path(raw_dir)
|
|
candidates = sorted(raw_dir.glob("*.json"))
|
|
if candidates:
|
|
return candidates
|
|
if raw_dir.name == "raw" and raw_dir.parent.exists():
|
|
return sorted(raw_dir.parent.glob("*.json"))
|
|
return []
|
|
|
|
|
|
def build_items_enriched(raw_dir):
|
|
rows = list(iter_costco_rows(raw_dir))
|
|
match_costco_discounts(rows)
|
|
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
|
|
return rows
|
|
|
|
|
|
def write_csv(path, rows):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--input-dir",
|
|
default=str(DEFAULT_INPUT_DIR),
|
|
show_default=True,
|
|
help="Directory containing Costco raw order json files.",
|
|
)
|
|
@click.option(
|
|
"--output-csv",
|
|
default=str(DEFAULT_OUTPUT_CSV),
|
|
show_default=True,
|
|
help="CSV path for enriched Costco item rows.",
|
|
)
|
|
def main(input_dir, output_csv):
|
|
click.echo("legacy entrypoint: prefer normalize_costco_web.py for data-model outputs")
|
|
rows = build_items_enriched(Path(input_dir))
|
|
write_csv(Path(output_csv), rows)
|
|
click.echo(f"wrote {len(rows)} rows to {output_csv}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|