Files
scrape-giant/enrich_giant.py

563 lines
16 KiB
Python

import csv
import json
import re
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path
import click
PARSER_VERSION = "giant-enrich-v1"
RETAILER = "giant"
DEFAULT_INPUT_DIR = Path("giant_output/raw")
DEFAULT_OUTPUT_CSV = Path("giant_output/items_enriched.csv")
OUTPUT_FIELDS = [
"retailer",
"order_id",
"line_no",
"normalized_row_id",
"normalized_item_id",
"normalization_basis",
"observed_item_key",
"order_date",
"retailer_item_id",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
"matched_discount_amount",
"net_line_total",
"image_url",
"raw_order_path",
"item_name_norm",
"brand_guess",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"normalized_quantity",
"normalized_quantity_unit",
"is_store_brand",
"is_item",
"is_fee",
"is_discount_line",
"is_coupon_line",
"price_per_each",
"price_per_each_basis",
"price_per_count",
"price_per_count_basis",
"price_per_lb",
"price_per_lb_basis",
"price_per_oz",
"price_per_oz_basis",
"parse_version",
"parse_notes",
]
STORE_BRAND_PREFIXES = {
"SB": "SB",
"NP": "NP",
}
DROP_TOKENS = {"FRESH"}
ABBREVIATIONS = {
"APPLE": "APPLE",
"APPLES": "APPLES",
"APLE": "APPLE",
"BASIL": "BASIL",
"BLK": "BLACK",
"BNLS": "BONELESS",
"BRWN": "BROWN",
"CARROTS": "CARROTS",
"CHDR": "CHEDDAR",
"CHICKEN": "CHICKEN",
"CHOC": "CHOCOLATE",
"CHS": "CHEESE",
"CHSE": "CHEESE",
"CHZ": "CHEESE",
"CILANTRO": "CILANTRO",
"CKI": "COOKIE",
"CRSHD": "CRUSHED",
"FLR": "FLOUR",
"FRSH": "FRESH",
"GALA": "GALA",
"GRAHM": "GRAHAM",
"HOT": "HOT",
"HRSRDSH": "HORSERADISH",
"IMP": "IMPORTED",
"IQF": "IQF",
"LENTILS": "LENTILS",
"LG": "LARGE",
"MLK": "MILK",
"MSTRD": "MUSTARD",
"ONION": "ONION",
"ORG": "ORGANIC",
"PEPPER": "PEPPER",
"PEPPERS": "PEPPERS",
"POT": "POTATO",
"POTATO": "POTATO",
"PPR": "PEPPER",
"RICOTTA": "RICOTTA",
"ROASTER": "ROASTER",
"ROTINI": "ROTINI",
"SCE": "SAUCE",
"SLC": "SLICED",
"SPINCH": "SPINACH",
"SPNC": "SPINACH",
"SPINACH": "SPINACH",
"SQZ": "SQUEEZE",
"SWT": "SWEET",
"THYME": "THYME",
"TOM": "TOMATO",
"TOMS": "TOMATOES",
"TRTL": "TORTILLA",
"VEG": "VEGETABLE",
"VINEGAR": "VINEGAR",
"WHT": "WHITE",
"WHOLE": "WHOLE",
"YLW": "YELLOW",
"YLWGLD": "YELLOW_GOLD",
}
FEE_PATTERNS = [
re.compile(r"\bBAG CHARGE\b"),
re.compile(r"\bDISC AT TOTAL\b"),
]
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(OZ|Z|LB|LBS|ML|L|FZ|FL OZ|QT|PT|GAL|GA)\b")
PACK_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(CT|PK|PKG|PACK)\b")
def to_decimal(value):
if value in ("", None):
return None
try:
return Decimal(str(value))
except (InvalidOperation, ValueError):
return None
def format_decimal(value, places=4):
if value is None:
return ""
quant = Decimal("1").scaleb(-places)
normalized = value.quantize(quant, rounding=ROUND_HALF_UP).normalize()
return format(normalized, "f")
def normalize_whitespace(value):
return " ".join(str(value or "").strip().split())
def clean_item_name(name):
cleaned = normalize_whitespace(name).upper()
cleaned = re.sub(r"^\+", "", cleaned)
cleaned = re.sub(r"^PLU#\d+\s*", "", cleaned)
cleaned = cleaned.replace("#", " ")
return normalize_whitespace(cleaned)
def extract_store_brand_prefix(cleaned_name):
for prefix, brand in STORE_BRAND_PREFIXES.items():
if cleaned_name == prefix or cleaned_name.startswith(f"{prefix} "):
return prefix, brand
return "", ""
def extract_image_url(item):
image = item.get("image")
if isinstance(image, dict):
for key in ["xlarge", "large", "medium", "small"]:
value = image.get(key)
if value:
return value
if isinstance(image, str):
return image
return ""
def parse_size_and_pack(cleaned_name):
size_value = ""
size_unit = ""
pack_qty = ""
size_matches = list(SIZE_RE.finditer(cleaned_name))
if size_matches:
match = size_matches[-1]
size_value = normalize_number(match.group(1))
size_unit = normalize_unit(match.group(2))
pack_matches = list(PACK_RE.finditer(cleaned_name))
if pack_matches:
match = pack_matches[-1]
pack_qty = normalize_number(match.group(1))
return size_value, size_unit, pack_qty
def normalize_number(value):
decimal = to_decimal(value)
if decimal is None:
return ""
return format(decimal.normalize(), "f")
def normalize_unit(unit):
collapsed = normalize_whitespace(unit).upper()
return {
"Z": "oz",
"OZ": "oz",
"FZ": "fl_oz",
"FL OZ": "fl_oz",
"LB": "lb",
"LBS": "lb",
"ML": "ml",
"L": "l",
"QT": "qt",
"PT": "pt",
"GAL": "gal",
"GA": "gal",
}.get(collapsed, collapsed.lower())
def strip_measure_tokens(cleaned_name):
without_sizes = SIZE_RE.sub(" ", cleaned_name)
without_measures = PACK_RE.sub(" ", without_sizes)
return normalize_whitespace(without_measures)
def expand_token(token):
return ABBREVIATIONS.get(token, token)
def normalize_item_name(cleaned_name):
prefix, _brand = extract_store_brand_prefix(cleaned_name)
base = cleaned_name
if prefix:
base = normalize_whitespace(base[len(prefix):])
base = strip_measure_tokens(base)
expanded_tokens = []
for token in base.split():
expanded = expand_token(token)
if expanded in DROP_TOKENS:
continue
expanded_tokens.append(expanded)
expanded = " ".join(token for token in expanded_tokens if token)
return singularize_tokens(normalize_whitespace(expanded))
def singularize_tokens(text):
singular_map = {
"APPLES": "APPLE",
"BANANAS": "BANANA",
"BERRIES": "BERRY",
"EGGS": "EGG",
"LEMONS": "LEMON",
"LIMES": "LIME",
"MANDARINS": "MANDARIN",
"PEPPERS": "PEPPER",
"STRAWBERRIES": "STRAWBERRY",
}
tokens = [singular_map.get(token, token) for token in text.split()]
return normalize_whitespace(" ".join(tokens))
def guess_measure_type(item, size_unit, pack_qty):
unit = normalize_whitespace(item.get("lbEachCd")).upper()
picked_weight = to_decimal(item.get("totalPickedWeight"))
qty = to_decimal(item.get("shipQy"))
if unit == "LB" or (picked_weight is not None and picked_weight > 0 and unit != "EA"):
return "weight"
if size_unit in {"lb", "oz"}:
return "weight"
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
return "volume"
if pack_qty:
return "count"
if unit == "EA" or (qty is not None and qty > 0):
return "each"
return ""
def is_fee_item(cleaned_name):
return any(pattern.search(cleaned_name) for pattern in FEE_PATTERNS)
def derive_prices(item, measure_type, size_value="", size_unit="", pack_qty=""):
qty = to_decimal(item.get("shipQy"))
line_total = to_decimal(item.get("groceryAmount"))
picked_weight = to_decimal(item.get("totalPickedWeight"))
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or Decimal("1")
price_per_each = ""
price_per_lb = ""
price_per_oz = ""
if line_total is None:
return price_per_each, price_per_lb, price_per_oz
if measure_type == "each" and qty not in (None, Decimal("0")):
price_per_each = format_decimal(line_total / qty)
if measure_type == "count" and qty not in (None, Decimal("0")):
price_per_each = format_decimal(line_total / qty)
if measure_type == "weight" and picked_weight not in (None, Decimal("0")):
per_lb = line_total / picked_weight
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
return price_per_each, price_per_lb, price_per_oz
if measure_type == "weight" and parsed_size not in (None, Decimal("0")) and qty not in (None, Decimal("0")):
total_units = qty * parsed_pack * parsed_size
if size_unit == "lb":
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
elif size_unit == "oz":
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * Decimal("16"))
return price_per_each, price_per_lb, price_per_oz
def derive_normalized_quantity(size_value, size_unit, pack_qty, measure_type):
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or Decimal("1")
if parsed_size not in (None, Decimal("0")) and size_unit:
return format_decimal(parsed_size * parsed_pack), size_unit
if parsed_pack not in (None, Decimal("0")) and measure_type == "count":
return format_decimal(parsed_pack), "count"
if measure_type == "each":
return "1", "each"
return "", ""
def derive_price_fields(price_per_each, price_per_lb, price_per_oz, line_total, qty, pack_qty):
line_total_decimal = to_decimal(line_total)
qty_decimal = to_decimal(qty)
pack_decimal = to_decimal(pack_qty)
price_per_count = ""
price_per_count_basis = ""
if line_total_decimal is not None and qty_decimal not in (None, Decimal("0")) and pack_decimal not in (
None,
Decimal("0"),
):
price_per_count = format_decimal(line_total_decimal / (qty_decimal * pack_decimal))
price_per_count_basis = "line_total_over_pack_qty"
return {
"price_per_each": price_per_each,
"price_per_each_basis": "line_total_over_qty" if price_per_each else "",
"price_per_count": price_per_count,
"price_per_count_basis": price_per_count_basis,
"price_per_lb": price_per_lb,
"price_per_lb_basis": "parsed_or_picked_weight" if price_per_lb else "",
"price_per_oz": price_per_oz,
"price_per_oz_basis": "parsed_or_picked_weight" if price_per_oz else "",
}
def normalization_identity(row):
if row.get("upc"):
return f"{row['retailer']}|upc={row['upc']}", "exact_upc"
if row.get("retailer_item_id"):
return f"{row['retailer']}|retailer_item_id={row['retailer_item_id']}", "exact_retailer_item_id"
if row.get("item_name_norm"):
return (
"|".join(
[
row["retailer"],
f"name={row['item_name_norm']}",
f"size={row.get('size_value', '')}",
f"unit={row.get('size_unit', '')}",
f"pack={row.get('pack_qty', '')}",
]
),
"exact_name_size_pack",
)
return row["normalized_row_id"], "row_identity"
def parse_item(order_id, order_date, raw_path, line_no, item):
cleaned_name = clean_item_name(item.get("itemName", ""))
size_value, size_unit, pack_qty = parse_size_and_pack(cleaned_name)
prefix, brand_guess = extract_store_brand_prefix(cleaned_name)
normalized_name = normalize_item_name(cleaned_name)
measure_type = guess_measure_type(item, size_unit, pack_qty)
price_per_each, price_per_lb, price_per_oz = derive_prices(
item,
measure_type,
size_value=size_value,
size_unit=size_unit,
pack_qty=pack_qty,
)
is_fee = is_fee_item(cleaned_name)
parse_notes = []
if prefix:
parse_notes.append(f"store_brand_prefix={prefix}")
if is_fee:
parse_notes.append("fee_item")
if size_value and not size_unit:
parse_notes.append("size_without_unit")
normalized_row_id = f"{RETAILER}:{order_id}:{line_no}"
normalized_quantity, normalized_quantity_unit = derive_normalized_quantity(
size_value,
size_unit,
pack_qty,
measure_type,
)
identity_key, normalization_basis = normalization_identity(
{
"retailer": RETAILER,
"normalized_row_id": normalized_row_id,
"upc": stringify(item.get("primUpcCd")),
"retailer_item_id": stringify(item.get("podId")),
"item_name_norm": normalized_name,
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
}
)
price_fields = derive_price_fields(
price_per_each,
price_per_lb,
price_per_oz,
stringify(item.get("groceryAmount")),
stringify(item.get("shipQy")),
pack_qty,
)
return {
"retailer": RETAILER,
"order_id": str(order_id),
"line_no": str(line_no),
"normalized_row_id": normalized_row_id,
"normalized_item_id": f"gnorm:{identity_key}",
"normalization_basis": normalization_basis,
"observed_item_key": normalized_row_id,
"order_date": normalize_whitespace(order_date),
"retailer_item_id": stringify(item.get("podId")),
"pod_id": stringify(item.get("podId")),
"item_name": stringify(item.get("itemName")),
"upc": stringify(item.get("primUpcCd")),
"category_id": stringify(item.get("categoryId")),
"category": stringify(item.get("categoryDesc")),
"qty": stringify(item.get("shipQy")),
"unit": stringify(item.get("lbEachCd")),
"unit_price": stringify(item.get("unitPrice")),
"line_total": stringify(item.get("groceryAmount")),
"picked_weight": stringify(item.get("totalPickedWeight")),
"mvp_savings": stringify(item.get("mvpSavings")),
"reward_savings": stringify(item.get("rewardSavings")),
"coupon_savings": stringify(item.get("couponSavings")),
"coupon_price": stringify(item.get("couponPrice")),
"matched_discount_amount": "",
"net_line_total": stringify(item.get("totalPrice")),
"image_url": extract_image_url(item),
"raw_order_path": raw_path.as_posix(),
"item_name_norm": normalized_name,
"brand_guess": brand_guess,
"variant": "",
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
"measure_type": measure_type,
"normalized_quantity": normalized_quantity,
"normalized_quantity_unit": normalized_quantity_unit,
"is_store_brand": "true" if bool(prefix) else "false",
"is_item": "false" if is_fee else "true",
"is_fee": "true" if is_fee else "false",
"is_discount_line": "false",
"is_coupon_line": "false",
**price_fields,
"parse_version": PARSER_VERSION,
"parse_notes": ";".join(parse_notes),
}
def stringify(value):
if value is None:
return ""
return str(value)
def iter_order_rows(raw_dir):
for path in sorted(raw_dir.glob("*.json")):
if path.name == "history.json":
continue
payload = json.loads(path.read_text(encoding="utf-8"))
order_id = payload.get("orderId", path.stem)
order_date = payload.get("orderDate", "")
for line_no, item in enumerate(payload.get("items", []), start=1):
yield parse_item(order_id, order_date, path, line_no, item)
def build_items_enriched(raw_dir):
rows = list(iter_order_rows(raw_dir))
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
return rows
def write_csv(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--input-dir",
default=str(DEFAULT_INPUT_DIR),
show_default=True,
help="Directory containing Giant raw order json files.",
)
@click.option(
"--output-csv",
default=str(DEFAULT_OUTPUT_CSV),
show_default=True,
help="CSV path for enriched Giant item rows.",
)
def main(input_dir, output_csv):
click.echo("legacy entrypoint: prefer normalize_giant_web.py for data-model outputs")
raw_dir = Path(input_dir)
output_path = Path(output_csv)
if not raw_dir.exists():
raise click.ClickException(f"input dir does not exist: {raw_dir}")
rows = build_items_enriched(raw_dir)
write_csv(output_path, rows)
click.echo(f"wrote {len(rows)} rows to {output_path}")
if __name__ == "__main__":
main()