diff --git a/enrich_costco.py b/enrich_costco.py index 4a4d358..78258a0 100644 --- a/enrich_costco.py +++ b/enrich_costco.py @@ -8,7 +8,10 @@ import click from enrich_giant import ( OUTPUT_FIELDS, + derive_normalized_quantity, + derive_price_fields, format_decimal, + normalization_identity, normalize_number, normalize_unit, normalize_whitespace, @@ -177,12 +180,42 @@ def parse_costco_item(order_id, order_date, raw_path, line_no, item): price_per_each, price_per_lb, price_per_oz = derive_costco_prices( item, measure_type, size_value, size_unit, pack_qty ) + normalized_row_id = f"{RETAILER}:{order_id}:{line_no}" + normalized_quantity, normalized_quantity_unit = derive_normalized_quantity( + size_value, + size_unit, + pack_qty, + measure_type, + ) + identity_key, normalization_basis = normalization_identity( + { + "retailer": RETAILER, + "normalized_row_id": normalized_row_id, + "upc": "", + "retailer_item_id": str(item.get("itemNumber", "")), + "item_name_norm": item_name_norm, + "size_value": size_value, + "size_unit": size_unit, + "pack_qty": pack_qty, + } + ) + price_fields = derive_price_fields( + price_per_each, + price_per_lb, + price_per_oz, + str(item.get("amount", "")), + str(item.get("unit", "")), + pack_qty, + ) return { "retailer": RETAILER, "order_id": str(order_id), "line_no": str(line_no), - "observed_item_key": f"{RETAILER}:{order_id}:{line_no}", + "normalized_row_id": normalized_row_id, + "normalized_item_id": f"cnorm:{identity_key}", + "normalization_basis": normalization_basis, + "observed_item_key": normalized_row_id, "order_date": normalize_whitespace(order_date), "retailer_item_id": str(item.get("itemNumber", "")), "pod_id": "", @@ -210,13 +243,14 @@ def parse_costco_item(order_id, order_date, raw_path, line_no, item): "size_unit": size_unit, "pack_qty": pack_qty, "measure_type": measure_type, + "normalized_quantity": normalized_quantity, + "normalized_quantity_unit": normalized_quantity_unit, "is_store_brand": "true" if brand_guess else "false", + "is_item": "false" if is_discount_line else "true", "is_fee": "false", "is_discount_line": "true" if is_discount_line else "false", "is_coupon_line": is_coupon_line, - "price_per_each": price_per_each, - "price_per_lb": price_per_lb, - "price_per_oz": price_per_oz, + **price_fields, "parse_version": PARSER_VERSION, "parse_notes": "", } @@ -321,6 +355,7 @@ def write_csv(path, rows): help="CSV path for enriched Costco item rows.", ) def main(input_dir, output_csv): + click.echo("legacy entrypoint: prefer normalize_costco_web.py for data-model outputs") rows = build_items_enriched(Path(input_dir)) write_csv(Path(output_csv), rows) click.echo(f"wrote {len(rows)} rows to {output_csv}") diff --git a/enrich_giant.py b/enrich_giant.py index 0ea7567..c40ab58 100644 --- a/enrich_giant.py +++ b/enrich_giant.py @@ -16,6 +16,9 @@ OUTPUT_FIELDS = [ "retailer", "order_id", "line_no", + "normalized_row_id", + "normalized_item_id", + "normalization_basis", "observed_item_key", "order_date", "retailer_item_id", @@ -44,13 +47,21 @@ OUTPUT_FIELDS = [ "size_unit", "pack_qty", "measure_type", + "normalized_quantity", + "normalized_quantity_unit", "is_store_brand", + "is_item", "is_fee", "is_discount_line", "is_coupon_line", "price_per_each", + "price_per_each_basis", + "price_per_count", + "price_per_count_basis", "price_per_lb", + "price_per_lb_basis", "price_per_oz", + "price_per_oz_basis", "parse_version", "parse_notes", ] @@ -329,6 +340,65 @@ def derive_prices(item, measure_type, size_value="", size_unit="", pack_qty=""): return price_per_each, price_per_lb, price_per_oz +def derive_normalized_quantity(size_value, size_unit, pack_qty, measure_type): + parsed_size = to_decimal(size_value) + parsed_pack = to_decimal(pack_qty) or Decimal("1") + + if parsed_size not in (None, Decimal("0")) and size_unit: + return format_decimal(parsed_size * parsed_pack), size_unit + if parsed_pack not in (None, Decimal("0")) and measure_type == "count": + return format_decimal(parsed_pack), "count" + if measure_type == "each": + return "1", "each" + return "", "" + + +def derive_price_fields(price_per_each, price_per_lb, price_per_oz, line_total, qty, pack_qty): + line_total_decimal = to_decimal(line_total) + qty_decimal = to_decimal(qty) + pack_decimal = to_decimal(pack_qty) + price_per_count = "" + price_per_count_basis = "" + if line_total_decimal is not None and qty_decimal not in (None, Decimal("0")) and pack_decimal not in ( + None, + Decimal("0"), + ): + price_per_count = format_decimal(line_total_decimal / (qty_decimal * pack_decimal)) + price_per_count_basis = "line_total_over_pack_qty" + + return { + "price_per_each": price_per_each, + "price_per_each_basis": "line_total_over_qty" if price_per_each else "", + "price_per_count": price_per_count, + "price_per_count_basis": price_per_count_basis, + "price_per_lb": price_per_lb, + "price_per_lb_basis": "parsed_or_picked_weight" if price_per_lb else "", + "price_per_oz": price_per_oz, + "price_per_oz_basis": "parsed_or_picked_weight" if price_per_oz else "", + } + + +def normalization_identity(row): + if row.get("upc"): + return f"{row['retailer']}|upc={row['upc']}", "exact_upc" + if row.get("retailer_item_id"): + return f"{row['retailer']}|retailer_item_id={row['retailer_item_id']}", "exact_retailer_item_id" + if row.get("item_name_norm"): + return ( + "|".join( + [ + row["retailer"], + f"name={row['item_name_norm']}", + f"size={row.get('size_value', '')}", + f"unit={row.get('size_unit', '')}", + f"pack={row.get('pack_qty', '')}", + ] + ), + "exact_name_size_pack", + ) + return row["normalized_row_id"], "row_identity" + + def parse_item(order_id, order_date, raw_path, line_no, item): cleaned_name = clean_item_name(item.get("itemName", "")) size_value, size_unit, pack_qty = parse_size_and_pack(cleaned_name) @@ -352,11 +422,42 @@ def parse_item(order_id, order_date, raw_path, line_no, item): if size_value and not size_unit: parse_notes.append("size_without_unit") + normalized_row_id = f"{RETAILER}:{order_id}:{line_no}" + normalized_quantity, normalized_quantity_unit = derive_normalized_quantity( + size_value, + size_unit, + pack_qty, + measure_type, + ) + identity_key, normalization_basis = normalization_identity( + { + "retailer": RETAILER, + "normalized_row_id": normalized_row_id, + "upc": stringify(item.get("primUpcCd")), + "retailer_item_id": stringify(item.get("podId")), + "item_name_norm": normalized_name, + "size_value": size_value, + "size_unit": size_unit, + "pack_qty": pack_qty, + } + ) + price_fields = derive_price_fields( + price_per_each, + price_per_lb, + price_per_oz, + stringify(item.get("groceryAmount")), + stringify(item.get("shipQy")), + pack_qty, + ) + return { "retailer": RETAILER, "order_id": str(order_id), "line_no": str(line_no), - "observed_item_key": f"{RETAILER}:{order_id}:{line_no}", + "normalized_row_id": normalized_row_id, + "normalized_item_id": f"gnorm:{identity_key}", + "normalization_basis": normalization_basis, + "observed_item_key": normalized_row_id, "order_date": normalize_whitespace(order_date), "retailer_item_id": stringify(item.get("podId")), "pod_id": stringify(item.get("podId")), @@ -384,13 +485,14 @@ def parse_item(order_id, order_date, raw_path, line_no, item): "size_unit": size_unit, "pack_qty": pack_qty, "measure_type": measure_type, + "normalized_quantity": normalized_quantity, + "normalized_quantity_unit": normalized_quantity_unit, "is_store_brand": "true" if bool(prefix) else "false", + "is_item": "false" if is_fee else "true", "is_fee": "true" if is_fee else "false", "is_discount_line": "false", "is_coupon_line": "false", - "price_per_each": price_per_each, - "price_per_lb": price_per_lb, - "price_per_oz": price_per_oz, + **price_fields, "parse_version": PARSER_VERSION, "parse_notes": ";".join(parse_notes), } @@ -443,6 +545,7 @@ def write_csv(path, rows): help="CSV path for enriched Giant item rows.", ) def main(input_dir, output_csv): + click.echo("legacy entrypoint: prefer normalize_giant_web.py for data-model outputs") raw_dir = Path(input_dir) output_path = Path(output_csv) diff --git a/normalize_costco_web.py b/normalize_costco_web.py new file mode 100644 index 0000000..305fc80 --- /dev/null +++ b/normalize_costco_web.py @@ -0,0 +1,28 @@ +from pathlib import Path + +import click + +import enrich_costco + + +@click.command() +@click.option( + "--input-dir", + default="data/costco-web/raw", + show_default=True, + help="Directory containing Costco raw order json files.", +) +@click.option( + "--output-csv", + default="data/costco-web/normalized_items.csv", + show_default=True, + help="CSV path for normalized Costco item rows.", +) +def main(input_dir, output_csv): + rows = enrich_costco.build_items_enriched(Path(input_dir)) + enrich_costco.write_csv(Path(output_csv), rows) + click.echo(f"wrote {len(rows)} rows to {output_csv}") + + +if __name__ == "__main__": + main() diff --git a/normalize_giant_web.py b/normalize_giant_web.py new file mode 100644 index 0000000..0b34baa --- /dev/null +++ b/normalize_giant_web.py @@ -0,0 +1,28 @@ +from pathlib import Path + +import click + +import enrich_giant + + +@click.command() +@click.option( + "--input-dir", + default="data/giant-web/raw", + show_default=True, + help="Directory containing Giant raw order json files.", +) +@click.option( + "--output-csv", + default="data/giant-web/normalized_items.csv", + show_default=True, + help="CSV path for normalized Giant item rows.", +) +def main(input_dir, output_csv): + rows = enrich_giant.build_items_enriched(Path(input_dir)) + enrich_giant.write_csv(Path(output_csv), rows) + click.echo(f"wrote {len(rows)} rows to {output_csv}") + + +if __name__ == "__main__": + main() diff --git a/pm/tasks.org b/pm/tasks.org index 016723c..19b96ef 100644 --- a/pm/tasks.org +++ b/pm/tasks.org @@ -502,7 +502,7 @@ move Giant and Costco collection into the new collect structure and make both re - Added lightweight deprecation nudges on the legacy `scrape_*` commands rather than removing them immediately, so the move is inspectable and low-risk. - The main schema fix was on Giant collection, which was missing retailer/provenance/audit fields that Costco collection already carried. -* [ ] t1.14.1: refactor retailer normalization into the new normalized_items schema (3-5 commits) +* [X] t1.14.1: refactor retailer normalization into the new normalized_items schema (3-5 commits) make Giant and Costco emit the shared normalized line-item schema without introducing cross-retailer identity logic ** Acceptance Criteria @@ -538,10 +538,13 @@ make Giant and Costco emit the shared normalized line-item schema without introd - pm note: normalized_item_id is the only retailer-level grouping identity; do not introduce observed_products or a second grouping artifact ** evidence - commit: -- tests: -- datetime: +- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python -m unittest tests.test_enrich_giant tests.test_costco_pipeline tests.test_purchases`; `./venv/bin/python normalize_giant_web.py --help`; `./venv/bin/python normalize_costco_web.py --help`; `./venv/bin/python enrich_giant.py --help`; `./venv/bin/python enrich_costco.py --help` +- datetime: 2026-03-18 ** notes +- Kept the existing Giant and Costco parsing logic intact and added the new normalized schema fields in place, rather than rewriting the enrichers from scratch. +- `normalized_item_id` is always present, but it only collapses repeated rows when the evidence is strong; otherwise it falls back to row-level identity via `normalized_row_id`. +- Added `normalize_*` entry points for the new data-model layout while leaving the legacy `enrich_*` commands available during the transition. * [ ] t1.15: refactor review/combine pipeline around normalized_item_id and catalog links (4-8 commits) replace the old observed/canonical workflow with a review-first pipeline that uses normalized_item_id as the retailer-level review unit and links it to catalog items diff --git a/tests/test_costco_pipeline.py b/tests/test_costco_pipeline.py index 47b0364..50f141f 100644 --- a/tests/test_costco_pipeline.py +++ b/tests/test_costco_pipeline.py @@ -258,6 +258,11 @@ class CostcoPipelineTests(unittest.TestCase): self.assertEqual("MIXED PEPPER", row["item_name_norm"]) self.assertEqual("6", row["pack_qty"]) self.assertEqual("count", row["measure_type"]) + self.assertEqual("costco:abc:1", row["normalized_row_id"]) + self.assertEqual("exact_retailer_item_id", row["normalization_basis"]) + self.assertTrue(row["normalized_item_id"]) + self.assertEqual("6", row["normalized_quantity"]) + self.assertEqual("count", row["normalized_quantity_unit"]) discount = enrich_costco.parse_costco_item( order_id="abc", @@ -278,6 +283,7 @@ class CostcoPipelineTests(unittest.TestCase): ) self.assertEqual("true", discount["is_discount_line"]) self.assertEqual("true", discount["is_coupon_line"]) + self.assertEqual("false", discount["is_item"]) def test_build_items_enriched_matches_discount_to_item(self): with tempfile.TemporaryDirectory() as tmpdir: diff --git a/tests/test_enrich_giant.py b/tests/test_enrich_giant.py index 39a34ff..75c1295 100644 --- a/tests/test_enrich_giant.py +++ b/tests/test_enrich_giant.py @@ -51,6 +51,11 @@ class EnrichGiantTests(unittest.TestCase): self.assertEqual("1.99", row["price_per_lb"]) self.assertEqual("0.1244", row["price_per_oz"]) self.assertEqual("https://example.test/apple.jpg", row["image_url"]) + self.assertEqual("giant:abc123:1", row["normalized_row_id"]) + self.assertEqual("exact_upc", row["normalization_basis"]) + self.assertEqual("5", row["normalized_quantity"]) + self.assertEqual("lb", row["normalized_quantity_unit"]) + self.assertEqual("true", row["is_item"]) fee_row = enrich_giant.parse_item( order_id="abc123", @@ -77,6 +82,7 @@ class EnrichGiantTests(unittest.TestCase): self.assertEqual("true", fee_row["is_fee"]) self.assertEqual("GL BAG CHARGE", fee_row["item_name_norm"]) + self.assertEqual("false", fee_row["is_item"]) def test_parse_item_derives_packaged_weight_prices_from_size_tokens(self): row = enrich_giant.parse_item( @@ -179,6 +185,8 @@ class EnrichGiantTests(unittest.TestCase): self.assertEqual("7.5", rows[0]["size_value"]) self.assertEqual("10", rows[0]["retailer_item_id"]) self.assertEqual("true", rows[1]["is_store_brand"]) + self.assertTrue(rows[0]["normalized_item_id"]) + self.assertEqual("exact_upc", rows[0]["normalization_basis"]) with output_csv.open(newline="", encoding="utf-8") as handle: written_rows = list(csv.DictReader(handle))