Refactor review pipeline around normalized items
This commit is contained in:
@@ -3,11 +3,8 @@ from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
import build_canonical_layer
|
||||
import build_observed_products
|
||||
import validate_cross_retailer_flow
|
||||
from enrich_giant import format_decimal, to_decimal
|
||||
from layer_helpers import read_csv_rows, stable_id, write_csv_rows
|
||||
from layer_helpers import read_csv_rows, write_csv_rows
|
||||
|
||||
|
||||
PURCHASE_FIELDS = [
|
||||
@@ -15,13 +12,18 @@ PURCHASE_FIELDS = [
|
||||
"retailer",
|
||||
"order_id",
|
||||
"line_no",
|
||||
"observed_item_key",
|
||||
"observed_product_id",
|
||||
"canonical_product_id",
|
||||
"normalized_row_id",
|
||||
"normalized_item_id",
|
||||
"catalog_id",
|
||||
"review_status",
|
||||
"resolution_action",
|
||||
"raw_item_name",
|
||||
"normalized_item_name",
|
||||
"catalog_name",
|
||||
"category",
|
||||
"product_type",
|
||||
"brand",
|
||||
"variant",
|
||||
"image_url",
|
||||
"retailer_item_id",
|
||||
"upc",
|
||||
@@ -55,7 +57,7 @@ PURCHASE_FIELDS = [
|
||||
|
||||
EXAMPLE_FIELDS = [
|
||||
"example_name",
|
||||
"canonical_product_id",
|
||||
"catalog_id",
|
||||
"giant_purchase_date",
|
||||
"giant_raw_item_name",
|
||||
"giant_price_per_lb",
|
||||
@@ -66,8 +68,8 @@ EXAMPLE_FIELDS = [
|
||||
]
|
||||
|
||||
CATALOG_FIELDS = [
|
||||
"canonical_product_id",
|
||||
"canonical_name",
|
||||
"catalog_id",
|
||||
"catalog_name",
|
||||
"category",
|
||||
"product_type",
|
||||
"brand",
|
||||
@@ -81,9 +83,20 @@ CATALOG_FIELDS = [
|
||||
"updated_at",
|
||||
]
|
||||
|
||||
PRODUCT_LINK_FIELDS = [
|
||||
"normalized_item_id",
|
||||
"catalog_id",
|
||||
"link_method",
|
||||
"link_confidence",
|
||||
"review_status",
|
||||
"reviewed_by",
|
||||
"reviewed_at",
|
||||
"link_notes",
|
||||
]
|
||||
|
||||
RESOLUTION_FIELDS = [
|
||||
"observed_product_id",
|
||||
"canonical_product_id",
|
||||
"normalized_item_id",
|
||||
"catalog_id",
|
||||
"resolution_action",
|
||||
"status",
|
||||
"resolution_notes",
|
||||
@@ -91,10 +104,6 @@ RESOLUTION_FIELDS = [
|
||||
]
|
||||
|
||||
|
||||
def decimal_or_zero(value):
|
||||
return to_decimal(value) or Decimal("0")
|
||||
|
||||
|
||||
def derive_metrics(row):
|
||||
line_total = to_decimal(row.get("net_line_total") or row.get("line_total"))
|
||||
qty = to_decimal(row.get("qty"))
|
||||
@@ -162,10 +171,7 @@ def derive_metrics(row):
|
||||
|
||||
|
||||
def order_lookup(rows, retailer):
|
||||
return {
|
||||
(retailer, row["order_id"]): row
|
||||
for row in rows
|
||||
}
|
||||
return {(retailer, row["order_id"]): row for row in rows}
|
||||
|
||||
|
||||
def read_optional_csv_rows(path):
|
||||
@@ -175,28 +181,10 @@ def read_optional_csv_rows(path):
|
||||
return read_csv_rows(path)
|
||||
|
||||
|
||||
def load_resolution_lookup(resolution_rows):
|
||||
lookup = {}
|
||||
for row in resolution_rows:
|
||||
if not row.get("observed_product_id"):
|
||||
continue
|
||||
lookup[row["observed_product_id"]] = row
|
||||
return lookup
|
||||
|
||||
|
||||
def merge_catalog_rows(existing_rows, auto_rows):
|
||||
merged = {}
|
||||
for row in auto_rows + existing_rows:
|
||||
canonical_product_id = row.get("canonical_product_id", "")
|
||||
if canonical_product_id:
|
||||
merged[canonical_product_id] = row
|
||||
return sorted(merged.values(), key=lambda row: row["canonical_product_id"])
|
||||
|
||||
|
||||
def catalog_row_from_canonical(row):
|
||||
def normalize_catalog_row(row):
|
||||
return {
|
||||
"canonical_product_id": row.get("canonical_product_id", ""),
|
||||
"canonical_name": row.get("canonical_name", ""),
|
||||
"catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
|
||||
"catalog_name": row.get("catalog_name") or row.get("canonical_name", ""),
|
||||
"category": row.get("category", ""),
|
||||
"product_type": row.get("product_type", ""),
|
||||
"brand": row.get("brand", ""),
|
||||
@@ -211,24 +199,67 @@ def catalog_row_from_canonical(row):
|
||||
}
|
||||
|
||||
|
||||
def build_link_state(enriched_rows):
|
||||
observed_rows = build_observed_products.build_observed_products(enriched_rows)
|
||||
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
|
||||
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
|
||||
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair(
|
||||
canonical_rows,
|
||||
link_rows,
|
||||
giant_row,
|
||||
costco_row,
|
||||
)
|
||||
def is_review_first_catalog_row(row):
|
||||
notes = row.get("notes", "").strip().lower()
|
||||
if notes.startswith("auto-linked via"):
|
||||
return False
|
||||
return True
|
||||
|
||||
observed_id_by_key = {
|
||||
row["observed_key"]: row["observed_product_id"] for row in observed_rows
|
||||
|
||||
def normalize_link_row(row):
|
||||
return {
|
||||
"normalized_item_id": row.get("normalized_item_id", ""),
|
||||
"catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
|
||||
"link_method": row.get("link_method", ""),
|
||||
"link_confidence": row.get("link_confidence", ""),
|
||||
"review_status": row.get("review_status", ""),
|
||||
"reviewed_by": row.get("reviewed_by", ""),
|
||||
"reviewed_at": row.get("reviewed_at", ""),
|
||||
"link_notes": row.get("link_notes", ""),
|
||||
}
|
||||
canonical_id_by_observed = {
|
||||
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
|
||||
|
||||
|
||||
def normalize_resolution_row(row):
|
||||
return {
|
||||
"normalized_item_id": row.get("normalized_item_id", ""),
|
||||
"catalog_id": row.get("catalog_id") or row.get("canonical_product_id", ""),
|
||||
"resolution_action": row.get("resolution_action", ""),
|
||||
"status": row.get("status", ""),
|
||||
"resolution_notes": row.get("resolution_notes", ""),
|
||||
"reviewed_at": row.get("reviewed_at", ""),
|
||||
}
|
||||
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
|
||||
|
||||
|
||||
def load_resolution_lookup(resolution_rows):
|
||||
lookup = {}
|
||||
for row in resolution_rows:
|
||||
normalized_row = normalize_resolution_row(row)
|
||||
normalized_item_id = normalized_row.get("normalized_item_id", "")
|
||||
if not normalized_item_id:
|
||||
continue
|
||||
lookup[normalized_item_id] = normalized_row
|
||||
return lookup
|
||||
|
||||
|
||||
def merge_catalog_rows(existing_rows, new_rows):
|
||||
merged = {}
|
||||
for row in existing_rows + new_rows:
|
||||
normalized_row = normalize_catalog_row(row)
|
||||
catalog_id = normalized_row.get("catalog_id", "")
|
||||
if catalog_id:
|
||||
merged[catalog_id] = normalized_row
|
||||
return sorted(merged.values(), key=lambda row: row["catalog_id"])
|
||||
|
||||
|
||||
def load_link_lookup(link_rows):
|
||||
lookup = {}
|
||||
for row in link_rows:
|
||||
normalized_row = normalize_link_row(row)
|
||||
normalized_item_id = normalized_row.get("normalized_item_id", "")
|
||||
if not normalized_item_id:
|
||||
continue
|
||||
lookup[normalized_item_id] = normalized_row
|
||||
return lookup
|
||||
|
||||
|
||||
def build_purchase_rows(
|
||||
@@ -237,25 +268,37 @@ def build_purchase_rows(
|
||||
giant_orders,
|
||||
costco_orders,
|
||||
resolution_rows,
|
||||
link_rows=None,
|
||||
catalog_rows=None,
|
||||
):
|
||||
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
|
||||
(
|
||||
observed_rows,
|
||||
canonical_rows,
|
||||
link_rows,
|
||||
observed_id_by_key,
|
||||
canonical_id_by_observed,
|
||||
) = build_link_state(all_enriched_rows)
|
||||
resolution_lookup = load_resolution_lookup(resolution_rows)
|
||||
for observed_product_id, resolution in resolution_lookup.items():
|
||||
link_lookup = load_link_lookup(link_rows or [])
|
||||
catalog_lookup = {
|
||||
row["catalog_id"]: normalize_catalog_row(row)
|
||||
for row in (catalog_rows or [])
|
||||
if normalize_catalog_row(row).get("catalog_id")
|
||||
}
|
||||
|
||||
for normalized_item_id, resolution in resolution_lookup.items():
|
||||
action = resolution.get("resolution_action", "")
|
||||
status = resolution.get("status", "")
|
||||
if status != "approved":
|
||||
continue
|
||||
if action in {"link", "create"} and resolution.get("canonical_product_id"):
|
||||
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"]
|
||||
if action in {"link", "create"} and resolution.get("catalog_id"):
|
||||
link_lookup[normalized_item_id] = {
|
||||
"normalized_item_id": normalized_item_id,
|
||||
"catalog_id": resolution["catalog_id"],
|
||||
"link_method": f"manual_{action}",
|
||||
"link_confidence": "high",
|
||||
"review_status": status,
|
||||
"reviewed_by": "",
|
||||
"reviewed_at": resolution.get("reviewed_at", ""),
|
||||
"link_notes": resolution.get("resolution_notes", ""),
|
||||
}
|
||||
elif action == "exclude":
|
||||
canonical_id_by_observed[observed_product_id] = ""
|
||||
link_lookup.pop(normalized_item_id, None)
|
||||
|
||||
orders_by_id = {}
|
||||
orders_by_id.update(order_lookup(giant_orders, "giant"))
|
||||
orders_by_id.update(order_lookup(costco_orders, "costco"))
|
||||
@@ -265,24 +308,30 @@ def build_purchase_rows(
|
||||
all_enriched_rows,
|
||||
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
|
||||
):
|
||||
observed_key = build_observed_products.build_observed_key(row)
|
||||
observed_product_id = observed_id_by_key.get(observed_key, "")
|
||||
normalized_item_id = row.get("normalized_item_id", "")
|
||||
resolution = resolution_lookup.get(normalized_item_id, {})
|
||||
link_row = link_lookup.get(normalized_item_id, {})
|
||||
catalog_row = catalog_lookup.get(link_row.get("catalog_id", ""), {})
|
||||
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
|
||||
metrics = derive_metrics(row)
|
||||
resolution = resolution_lookup.get(observed_product_id, {})
|
||||
purchase_rows.append(
|
||||
{
|
||||
"purchase_date": row["order_date"],
|
||||
"retailer": row["retailer"],
|
||||
"order_id": row["order_id"],
|
||||
"line_no": row["line_no"],
|
||||
"observed_item_key": row["observed_item_key"],
|
||||
"observed_product_id": observed_product_id,
|
||||
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
|
||||
"normalized_row_id": row.get("normalized_row_id", ""),
|
||||
"normalized_item_id": normalized_item_id,
|
||||
"catalog_id": link_row.get("catalog_id", ""),
|
||||
"review_status": resolution.get("status", ""),
|
||||
"resolution_action": resolution.get("resolution_action", ""),
|
||||
"raw_item_name": row["item_name"],
|
||||
"normalized_item_name": row["item_name_norm"],
|
||||
"catalog_name": catalog_row.get("catalog_name", ""),
|
||||
"category": catalog_row.get("category", ""),
|
||||
"product_type": catalog_row.get("product_type", ""),
|
||||
"brand": catalog_row.get("brand", ""),
|
||||
"variant": catalog_row.get("variant", ""),
|
||||
"image_url": row.get("image_url", ""),
|
||||
"retailer_item_id": row["retailer_item_id"],
|
||||
"upc": row["upc"],
|
||||
@@ -307,33 +356,7 @@ def build_purchase_rows(
|
||||
**metrics,
|
||||
}
|
||||
)
|
||||
return purchase_rows, observed_rows, canonical_rows, link_rows
|
||||
|
||||
|
||||
def apply_manual_resolutions_to_links(link_rows, resolution_rows):
|
||||
link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows}
|
||||
for resolution in resolution_rows:
|
||||
if resolution.get("status") != "approved":
|
||||
continue
|
||||
observed_product_id = resolution.get("observed_product_id", "")
|
||||
action = resolution.get("resolution_action", "")
|
||||
if not observed_product_id:
|
||||
continue
|
||||
if action == "exclude":
|
||||
link_by_observed.pop(observed_product_id, None)
|
||||
continue
|
||||
if action in {"link", "create"} and resolution.get("canonical_product_id"):
|
||||
link_by_observed[observed_product_id] = {
|
||||
"observed_product_id": observed_product_id,
|
||||
"canonical_product_id": resolution["canonical_product_id"],
|
||||
"link_method": f"manual_{action}",
|
||||
"link_confidence": "high",
|
||||
"review_status": resolution.get("status", ""),
|
||||
"reviewed_by": "",
|
||||
"reviewed_at": resolution.get("reviewed_at", ""),
|
||||
"link_notes": resolution.get("resolution_notes", ""),
|
||||
}
|
||||
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
|
||||
return purchase_rows, sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"])
|
||||
|
||||
|
||||
def build_comparison_examples(purchase_rows):
|
||||
@@ -342,7 +365,7 @@ def build_comparison_examples(purchase_rows):
|
||||
for row in purchase_rows:
|
||||
if row.get("normalized_item_name") != "BANANA":
|
||||
continue
|
||||
if not row.get("canonical_product_id"):
|
||||
if not row.get("catalog_id"):
|
||||
continue
|
||||
if row["retailer"] == "giant" and row.get("price_per_lb"):
|
||||
giant_banana = row
|
||||
@@ -355,7 +378,7 @@ def build_comparison_examples(purchase_rows):
|
||||
return [
|
||||
{
|
||||
"example_name": "banana_price_per_lb",
|
||||
"canonical_product_id": giant_banana["canonical_product_id"],
|
||||
"catalog_id": giant_banana["catalog_id"],
|
||||
"giant_purchase_date": giant_banana["purchase_date"],
|
||||
"giant_raw_item_name": giant_banana["raw_item_name"],
|
||||
"giant_price_per_lb": giant_banana["price_per_lb"],
|
||||
@@ -389,27 +412,29 @@ def main(
|
||||
examples_csv,
|
||||
):
|
||||
resolution_rows = read_optional_csv_rows(resolutions_csv)
|
||||
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows(
|
||||
catalog_rows = merge_catalog_rows(
|
||||
[row for row in read_optional_csv_rows(catalog_csv) if is_review_first_catalog_row(row)],
|
||||
[],
|
||||
)
|
||||
existing_links = [normalize_link_row(row) for row in read_optional_csv_rows(links_csv)]
|
||||
purchase_rows, link_rows = build_purchase_rows(
|
||||
read_csv_rows(giant_items_enriched_csv),
|
||||
read_csv_rows(costco_items_enriched_csv),
|
||||
read_csv_rows(giant_orders_csv),
|
||||
read_csv_rows(costco_orders_csv),
|
||||
resolution_rows,
|
||||
existing_links,
|
||||
catalog_rows,
|
||||
)
|
||||
existing_catalog_rows = read_optional_csv_rows(catalog_csv)
|
||||
merged_catalog_rows = merge_catalog_rows(
|
||||
existing_catalog_rows,
|
||||
[catalog_row_from_canonical(row) for row in canonical_rows],
|
||||
)
|
||||
link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows)
|
||||
example_rows = build_comparison_examples(purchase_rows)
|
||||
write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS)
|
||||
write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS)
|
||||
write_csv_rows(catalog_csv, catalog_rows, CATALOG_FIELDS)
|
||||
write_csv_rows(links_csv, link_rows, PRODUCT_LINK_FIELDS)
|
||||
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
|
||||
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
|
||||
click.echo(
|
||||
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
|
||||
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, "
|
||||
f"{len(catalog_rows)} catalog rows to {catalog_csv}, "
|
||||
f"{len(link_rows)} product links to {links_csv}, "
|
||||
f"and {len(example_rows)} comparison examples to {examples_csv}"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user