580 lines
17 KiB
Python
580 lines
17 KiB
Python
import csv
|
|
import json
|
|
import re
|
|
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
|
|
from pathlib import Path
|
|
|
|
import click
|
|
|
|
|
|
PARSER_VERSION = "giant-enrich-v1"
|
|
RETAILER = "giant"
|
|
DEFAULT_INPUT_DIR = Path("giant_output/raw")
|
|
DEFAULT_OUTPUT_CSV = Path("giant_output/items_enriched.csv")
|
|
|
|
OUTPUT_FIELDS = [
|
|
"retailer",
|
|
"order_id",
|
|
"line_no",
|
|
"normalized_row_id",
|
|
"normalized_item_id",
|
|
"normalization_basis",
|
|
"observed_item_key",
|
|
"order_date",
|
|
"retailer_item_id",
|
|
"pod_id",
|
|
"item_name",
|
|
"upc",
|
|
"category_id",
|
|
"category",
|
|
"qty",
|
|
"unit",
|
|
"unit_price",
|
|
"line_total",
|
|
"picked_weight",
|
|
"mvp_savings",
|
|
"reward_savings",
|
|
"coupon_savings",
|
|
"coupon_price",
|
|
"matched_discount_amount",
|
|
"net_line_total",
|
|
"image_url",
|
|
"raw_order_path",
|
|
"item_name_norm",
|
|
"brand_guess",
|
|
"variant",
|
|
"size_value",
|
|
"size_unit",
|
|
"pack_qty",
|
|
"measure_type",
|
|
"normalized_quantity",
|
|
"normalized_quantity_unit",
|
|
"is_store_brand",
|
|
"is_item",
|
|
"is_fee",
|
|
"is_discount_line",
|
|
"is_coupon_line",
|
|
"price_per_each",
|
|
"price_per_each_basis",
|
|
"price_per_count",
|
|
"price_per_count_basis",
|
|
"price_per_lb",
|
|
"price_per_lb_basis",
|
|
"price_per_oz",
|
|
"price_per_oz_basis",
|
|
"parse_version",
|
|
"parse_notes",
|
|
]
|
|
|
|
STORE_BRAND_PREFIXES = {
|
|
"SB": "SB",
|
|
"NP": "NP",
|
|
}
|
|
|
|
DROP_TOKENS = {"FRESH"}
|
|
|
|
ABBREVIATIONS = {
|
|
"APPLE": "APPLE",
|
|
"APPLES": "APPLES",
|
|
"APLE": "APPLE",
|
|
"BASIL": "BASIL",
|
|
"BLK": "BLACK",
|
|
"BNLS": "BONELESS",
|
|
"BRWN": "BROWN",
|
|
"CARROTS": "CARROTS",
|
|
"CHDR": "CHEDDAR",
|
|
"CHICKEN": "CHICKEN",
|
|
"CHOC": "CHOCOLATE",
|
|
"CHS": "CHEESE",
|
|
"CHSE": "CHEESE",
|
|
"CHZ": "CHEESE",
|
|
"CILANTRO": "CILANTRO",
|
|
"CKI": "COOKIE",
|
|
"CRSHD": "CRUSHED",
|
|
"FLR": "FLOUR",
|
|
"FRSH": "FRESH",
|
|
"GALA": "GALA",
|
|
"GRAHM": "GRAHAM",
|
|
"HOT": "HOT",
|
|
"HRSRDSH": "HORSERADISH",
|
|
"IMP": "IMPORTED",
|
|
"IQF": "IQF",
|
|
"LENTILS": "LENTILS",
|
|
"LG": "LARGE",
|
|
"MLK": "MILK",
|
|
"MSTRD": "MUSTARD",
|
|
"ONION": "ONION",
|
|
"ORG": "ORGANIC",
|
|
"PEPPER": "PEPPER",
|
|
"PEPPERS": "PEPPERS",
|
|
"POT": "POTATO",
|
|
"POTATO": "POTATO",
|
|
"PPR": "PEPPER",
|
|
"RICOTTA": "RICOTTA",
|
|
"ROASTER": "ROASTER",
|
|
"ROTINI": "ROTINI",
|
|
"SCE": "SAUCE",
|
|
"SLC": "SLICED",
|
|
"SPINCH": "SPINACH",
|
|
"SPNC": "SPINACH",
|
|
"SPINACH": "SPINACH",
|
|
"SQZ": "SQUEEZE",
|
|
"SWT": "SWEET",
|
|
"THYME": "THYME",
|
|
"TOM": "TOMATO",
|
|
"TOMS": "TOMATOES",
|
|
"TRTL": "TORTILLA",
|
|
"VEG": "VEGETABLE",
|
|
"VINEGAR": "VINEGAR",
|
|
"WHT": "WHITE",
|
|
"WHOLE": "WHOLE",
|
|
"YLW": "YELLOW",
|
|
"YLWGLD": "YELLOW_GOLD",
|
|
}
|
|
|
|
FEE_PATTERNS = [
|
|
re.compile(r"\bBAG CHARGE\b"),
|
|
re.compile(r"\bDISC AT TOTAL\b"),
|
|
]
|
|
|
|
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(OZ|Z|LB|LBS|ML|L|FZ|FL OZ|QT|PT|GAL|GA)\b")
|
|
PACK_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(CT|PK|PKG|PACK)\b")
|
|
|
|
|
|
def to_decimal(value):
|
|
if value in ("", None):
|
|
return None
|
|
|
|
try:
|
|
return Decimal(str(value))
|
|
except (InvalidOperation, ValueError):
|
|
return None
|
|
|
|
|
|
def format_decimal(value, places=4):
|
|
if value is None:
|
|
return ""
|
|
|
|
quant = Decimal("1").scaleb(-places)
|
|
normalized = value.quantize(quant, rounding=ROUND_HALF_UP).normalize()
|
|
return format(normalized, "f")
|
|
|
|
|
|
def normalize_whitespace(value):
|
|
return " ".join(str(value or "").strip().split())
|
|
|
|
|
|
def clean_item_name(name):
|
|
cleaned = normalize_whitespace(name).upper()
|
|
cleaned = re.sub(r"^\+", "", cleaned)
|
|
cleaned = re.sub(r"^PLU#\d+\s*", "", cleaned)
|
|
cleaned = cleaned.replace("#", " ")
|
|
return normalize_whitespace(cleaned)
|
|
|
|
|
|
def extract_store_brand_prefix(cleaned_name):
|
|
for prefix, brand in STORE_BRAND_PREFIXES.items():
|
|
if cleaned_name == prefix or cleaned_name.startswith(f"{prefix} "):
|
|
return prefix, brand
|
|
return "", ""
|
|
|
|
|
|
def extract_image_url(item):
|
|
image = item.get("image")
|
|
if isinstance(image, dict):
|
|
for key in ["xlarge", "large", "medium", "small"]:
|
|
value = image.get(key)
|
|
if value:
|
|
return value
|
|
if isinstance(image, str):
|
|
return image
|
|
return ""
|
|
|
|
|
|
def parse_size_and_pack(cleaned_name):
|
|
size_value = ""
|
|
size_unit = ""
|
|
pack_qty = ""
|
|
|
|
size_matches = list(SIZE_RE.finditer(cleaned_name))
|
|
if size_matches:
|
|
match = size_matches[-1]
|
|
size_value = normalize_number(match.group(1))
|
|
size_unit = normalize_unit(match.group(2))
|
|
|
|
pack_matches = list(PACK_RE.finditer(cleaned_name))
|
|
if pack_matches:
|
|
match = pack_matches[-1]
|
|
pack_qty = normalize_number(match.group(1))
|
|
|
|
return size_value, size_unit, pack_qty
|
|
|
|
|
|
def normalize_number(value):
|
|
decimal = to_decimal(value)
|
|
if decimal is None:
|
|
return ""
|
|
return format(decimal.normalize(), "f")
|
|
|
|
|
|
def normalize_unit(unit):
|
|
collapsed = normalize_whitespace(unit).upper()
|
|
return {
|
|
"Z": "oz",
|
|
"OZ": "oz",
|
|
"FZ": "fl_oz",
|
|
"FL OZ": "fl_oz",
|
|
"FLOZ": "fl_oz",
|
|
"LB": "lb",
|
|
"LBS": "lb",
|
|
"ML": "ml",
|
|
"L": "l",
|
|
"QT": "qt",
|
|
"QTS": "qt",
|
|
"PT": "pt",
|
|
"PTS": "pt",
|
|
"GAL": "gal",
|
|
"GALS": "gal",
|
|
"GA": "gal",
|
|
}.get(collapsed, collapsed.lower())
|
|
|
|
|
|
def strip_measure_tokens(cleaned_name):
|
|
without_sizes = SIZE_RE.sub(" ", cleaned_name)
|
|
without_measures = PACK_RE.sub(" ", without_sizes)
|
|
return normalize_whitespace(without_measures)
|
|
|
|
|
|
def expand_token(token):
|
|
return ABBREVIATIONS.get(token, token)
|
|
|
|
|
|
def normalize_item_name(cleaned_name):
|
|
prefix, _brand = extract_store_brand_prefix(cleaned_name)
|
|
base = cleaned_name
|
|
if prefix:
|
|
base = normalize_whitespace(base[len(prefix):])
|
|
|
|
base = strip_measure_tokens(base)
|
|
expanded_tokens = []
|
|
for token in base.split():
|
|
expanded = expand_token(token)
|
|
if expanded in DROP_TOKENS:
|
|
continue
|
|
expanded_tokens.append(expanded)
|
|
expanded = " ".join(token for token in expanded_tokens if token)
|
|
return singularize_tokens(normalize_whitespace(expanded))
|
|
|
|
|
|
def singularize_tokens(text):
|
|
singular_map = {
|
|
"APPLES": "APPLE",
|
|
"BANANAS": "BANANA",
|
|
"BERRIES": "BERRY",
|
|
"EGGS": "EGG",
|
|
"LEMONS": "LEMON",
|
|
"LIMES": "LIME",
|
|
"MANDARINS": "MANDARIN",
|
|
"PEPPERS": "PEPPER",
|
|
"STRAWBERRIES": "STRAWBERRY",
|
|
}
|
|
tokens = [singular_map.get(token, token) for token in text.split()]
|
|
return normalize_whitespace(" ".join(tokens))
|
|
|
|
|
|
def guess_measure_type(item, size_unit, pack_qty):
|
|
unit = normalize_whitespace(item.get("lbEachCd")).upper()
|
|
picked_weight = to_decimal(item.get("totalPickedWeight"))
|
|
qty = to_decimal(item.get("shipQy"))
|
|
|
|
if unit == "LB" or (picked_weight is not None and picked_weight > 0 and unit != "EA"):
|
|
return "weight"
|
|
if size_unit in {"lb", "oz"}:
|
|
return "weight"
|
|
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
|
|
return "volume"
|
|
if pack_qty:
|
|
return "count"
|
|
if unit == "EA" or (qty is not None and qty > 0):
|
|
return "each"
|
|
return ""
|
|
|
|
|
|
def is_fee_item(cleaned_name):
|
|
return any(pattern.search(cleaned_name) for pattern in FEE_PATTERNS)
|
|
|
|
|
|
def derive_prices(item, measure_type, size_value="", size_unit="", pack_qty=""):
|
|
qty = to_decimal(item.get("shipQy"))
|
|
line_total = to_decimal(item.get("groceryAmount"))
|
|
picked_weight = to_decimal(item.get("totalPickedWeight"))
|
|
parsed_size = to_decimal(size_value)
|
|
parsed_pack = to_decimal(pack_qty) or Decimal("1")
|
|
|
|
price_per_each = ""
|
|
price_per_lb = ""
|
|
price_per_oz = ""
|
|
|
|
if line_total is None:
|
|
return price_per_each, price_per_lb, price_per_oz
|
|
|
|
if measure_type == "each" and qty not in (None, Decimal("0")):
|
|
price_per_each = format_decimal(line_total / qty)
|
|
|
|
if measure_type == "count" and qty not in (None, Decimal("0")):
|
|
price_per_each = format_decimal(line_total / qty)
|
|
|
|
if measure_type == "weight" and picked_weight not in (None, Decimal("0")):
|
|
per_lb = line_total / picked_weight
|
|
price_per_lb = format_decimal(per_lb)
|
|
price_per_oz = format_decimal(per_lb / Decimal("16"))
|
|
return price_per_each, price_per_lb, price_per_oz
|
|
|
|
if measure_type == "weight" and parsed_size not in (None, Decimal("0")) and qty not in (None, Decimal("0")):
|
|
total_units = qty * parsed_pack * parsed_size
|
|
if size_unit == "lb":
|
|
per_lb = line_total / total_units
|
|
price_per_lb = format_decimal(per_lb)
|
|
price_per_oz = format_decimal(per_lb / Decimal("16"))
|
|
elif size_unit == "oz":
|
|
per_oz = line_total / total_units
|
|
price_per_oz = format_decimal(per_oz)
|
|
price_per_lb = format_decimal(per_oz * Decimal("16"))
|
|
|
|
return price_per_each, price_per_lb, price_per_oz
|
|
|
|
|
|
def derive_normalized_quantity(qty, size_value, size_unit, pack_qty, measure_type, picked_weight=""):
|
|
parsed_qty = to_decimal(qty)
|
|
parsed_size = to_decimal(size_value)
|
|
parsed_pack = to_decimal(pack_qty)
|
|
parsed_picked_weight = to_decimal(picked_weight)
|
|
total_multiplier = None
|
|
if parsed_qty not in (None, Decimal("0")):
|
|
total_multiplier = parsed_qty * (parsed_pack or Decimal("1"))
|
|
|
|
if (
|
|
parsed_size not in (None, Decimal("0"))
|
|
and size_unit
|
|
and total_multiplier not in (None, Decimal("0"))
|
|
):
|
|
return format_decimal(parsed_size * total_multiplier), size_unit
|
|
if measure_type == "weight" and parsed_picked_weight not in (None, Decimal("0")):
|
|
return format_decimal(parsed_picked_weight), "lb"
|
|
if measure_type == "count" and total_multiplier not in (None, Decimal("0")):
|
|
return format_decimal(total_multiplier), "count"
|
|
if measure_type == "each" and parsed_qty not in (None, Decimal("0")):
|
|
return format_decimal(parsed_qty), "each"
|
|
return "", ""
|
|
|
|
|
|
def derive_price_fields(price_per_each, price_per_lb, price_per_oz, line_total, qty, pack_qty):
|
|
line_total_decimal = to_decimal(line_total)
|
|
qty_decimal = to_decimal(qty)
|
|
pack_decimal = to_decimal(pack_qty)
|
|
price_per_count = ""
|
|
price_per_count_basis = ""
|
|
if line_total_decimal is not None and qty_decimal not in (None, Decimal("0")) and pack_decimal not in (
|
|
None,
|
|
Decimal("0"),
|
|
):
|
|
price_per_count = format_decimal(line_total_decimal / (qty_decimal * pack_decimal))
|
|
price_per_count_basis = "line_total_over_pack_qty"
|
|
|
|
return {
|
|
"price_per_each": price_per_each,
|
|
"price_per_each_basis": "line_total_over_qty" if price_per_each else "",
|
|
"price_per_count": price_per_count,
|
|
"price_per_count_basis": price_per_count_basis,
|
|
"price_per_lb": price_per_lb,
|
|
"price_per_lb_basis": "parsed_or_picked_weight" if price_per_lb else "",
|
|
"price_per_oz": price_per_oz,
|
|
"price_per_oz_basis": "parsed_or_picked_weight" if price_per_oz else "",
|
|
}
|
|
|
|
|
|
def normalization_identity(row):
|
|
if row.get("upc"):
|
|
return f"{row['retailer']}|upc={row['upc']}", "exact_upc"
|
|
if row.get("retailer_item_id"):
|
|
return f"{row['retailer']}|retailer_item_id={row['retailer_item_id']}", "exact_retailer_item_id"
|
|
if row.get("item_name_norm"):
|
|
return (
|
|
"|".join(
|
|
[
|
|
row["retailer"],
|
|
f"name={row['item_name_norm']}",
|
|
f"size={row.get('size_value', '')}",
|
|
f"unit={row.get('size_unit', '')}",
|
|
f"pack={row.get('pack_qty', '')}",
|
|
]
|
|
),
|
|
"exact_name_size_pack",
|
|
)
|
|
return row["normalized_row_id"], "row_identity"
|
|
|
|
|
|
def parse_item(order_id, order_date, raw_path, line_no, item):
|
|
cleaned_name = clean_item_name(item.get("itemName", ""))
|
|
size_value, size_unit, pack_qty = parse_size_and_pack(cleaned_name)
|
|
prefix, brand_guess = extract_store_brand_prefix(cleaned_name)
|
|
normalized_name = normalize_item_name(cleaned_name)
|
|
measure_type = guess_measure_type(item, size_unit, pack_qty)
|
|
price_per_each, price_per_lb, price_per_oz = derive_prices(
|
|
item,
|
|
measure_type,
|
|
size_value=size_value,
|
|
size_unit=size_unit,
|
|
pack_qty=pack_qty,
|
|
)
|
|
is_fee = is_fee_item(cleaned_name)
|
|
parse_notes = []
|
|
|
|
if prefix:
|
|
parse_notes.append(f"store_brand_prefix={prefix}")
|
|
if is_fee:
|
|
parse_notes.append("fee_item")
|
|
if size_value and not size_unit:
|
|
parse_notes.append("size_without_unit")
|
|
|
|
normalized_row_id = f"{RETAILER}:{order_id}:{line_no}"
|
|
normalized_quantity, normalized_quantity_unit = derive_normalized_quantity(
|
|
item.get("shipQy"),
|
|
size_value,
|
|
size_unit,
|
|
pack_qty,
|
|
measure_type,
|
|
item.get("totalPickedWeight"),
|
|
)
|
|
identity_key, normalization_basis = normalization_identity(
|
|
{
|
|
"retailer": RETAILER,
|
|
"normalized_row_id": normalized_row_id,
|
|
"upc": stringify(item.get("primUpcCd")),
|
|
"retailer_item_id": stringify(item.get("podId")),
|
|
"item_name_norm": normalized_name,
|
|
"size_value": size_value,
|
|
"size_unit": size_unit,
|
|
"pack_qty": pack_qty,
|
|
}
|
|
)
|
|
price_fields = derive_price_fields(
|
|
price_per_each,
|
|
price_per_lb,
|
|
price_per_oz,
|
|
stringify(item.get("groceryAmount")),
|
|
stringify(item.get("shipQy")),
|
|
pack_qty,
|
|
)
|
|
|
|
return {
|
|
"retailer": RETAILER,
|
|
"order_id": str(order_id),
|
|
"line_no": str(line_no),
|
|
"normalized_row_id": normalized_row_id,
|
|
"normalized_item_id": f"gnorm:{identity_key}",
|
|
"normalization_basis": normalization_basis,
|
|
"observed_item_key": normalized_row_id,
|
|
"order_date": normalize_whitespace(order_date),
|
|
"retailer_item_id": stringify(item.get("podId")),
|
|
"pod_id": stringify(item.get("podId")),
|
|
"item_name": stringify(item.get("itemName")),
|
|
"upc": stringify(item.get("primUpcCd")),
|
|
"category_id": stringify(item.get("categoryId")),
|
|
"category": stringify(item.get("categoryDesc")),
|
|
"qty": stringify(item.get("shipQy")),
|
|
"unit": stringify(item.get("lbEachCd")),
|
|
"unit_price": stringify(item.get("unitPrice")),
|
|
"line_total": stringify(item.get("groceryAmount")),
|
|
"picked_weight": stringify(item.get("totalPickedWeight")),
|
|
"mvp_savings": stringify(item.get("mvpSavings")),
|
|
"reward_savings": stringify(item.get("rewardSavings")),
|
|
"coupon_savings": stringify(item.get("couponSavings")),
|
|
"coupon_price": stringify(item.get("couponPrice")),
|
|
"matched_discount_amount": "",
|
|
"net_line_total": stringify(item.get("totalPrice")),
|
|
"image_url": extract_image_url(item),
|
|
"raw_order_path": raw_path.as_posix(),
|
|
"item_name_norm": normalized_name,
|
|
"brand_guess": brand_guess,
|
|
"variant": "",
|
|
"size_value": size_value,
|
|
"size_unit": size_unit,
|
|
"pack_qty": pack_qty,
|
|
"measure_type": measure_type,
|
|
"normalized_quantity": normalized_quantity,
|
|
"normalized_quantity_unit": normalized_quantity_unit,
|
|
"is_store_brand": "true" if bool(prefix) else "false",
|
|
"is_item": "false" if is_fee else "true",
|
|
"is_fee": "true" if is_fee else "false",
|
|
"is_discount_line": "false",
|
|
"is_coupon_line": "false",
|
|
**price_fields,
|
|
"parse_version": PARSER_VERSION,
|
|
"parse_notes": ";".join(parse_notes),
|
|
}
|
|
|
|
|
|
def stringify(value):
|
|
if value is None:
|
|
return ""
|
|
return str(value)
|
|
|
|
|
|
def iter_order_rows(raw_dir):
|
|
for path in sorted(raw_dir.glob("*.json")):
|
|
if path.name == "history.json":
|
|
continue
|
|
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
order_id = payload.get("orderId", path.stem)
|
|
order_date = payload.get("orderDate", "")
|
|
|
|
for line_no, item in enumerate(payload.get("items", []), start=1):
|
|
yield parse_item(order_id, order_date, path, line_no, item)
|
|
|
|
|
|
def build_items_enriched(raw_dir):
|
|
rows = list(iter_order_rows(raw_dir))
|
|
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
|
|
return rows
|
|
|
|
|
|
def write_csv(path, rows):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--input-dir",
|
|
default=str(DEFAULT_INPUT_DIR),
|
|
show_default=True,
|
|
help="Directory containing Giant raw order json files.",
|
|
)
|
|
@click.option(
|
|
"--output-csv",
|
|
default=str(DEFAULT_OUTPUT_CSV),
|
|
show_default=True,
|
|
help="CSV path for enriched Giant item rows.",
|
|
)
|
|
def main(input_dir, output_csv):
|
|
click.echo("legacy entrypoint: prefer normalize_giant_web.py for data-model outputs")
|
|
raw_dir = Path(input_dir)
|
|
output_path = Path(output_csv)
|
|
|
|
if not raw_dir.exists():
|
|
raise click.ClickException(f"input dir does not exist: {raw_dir}")
|
|
|
|
rows = build_items_enriched(raw_dir)
|
|
write_csv(output_path, rows)
|
|
|
|
click.echo(f"wrote {len(rows)} rows to {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|