Add terminal review resolution workflow

This commit is contained in:
ben
2026-03-16 20:45:37 -04:00
parent 34eedff9c5
commit c7dad5489e
5 changed files with 597 additions and 9 deletions

View File

@@ -7,7 +7,7 @@ import build_canonical_layer
import build_observed_products
import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, write_csv_rows
from layer_helpers import read_csv_rows, stable_id, write_csv_rows
PURCHASE_FIELDS = [
@@ -18,6 +18,8 @@ PURCHASE_FIELDS = [
"observed_item_key",
"observed_product_id",
"canonical_product_id",
"review_status",
"resolution_action",
"raw_item_name",
"normalized_item_name",
"retailer_item_id",
@@ -60,6 +62,31 @@ EXAMPLE_FIELDS = [
"notes",
]
CATALOG_FIELDS = [
"canonical_product_id",
"canonical_name",
"category",
"product_type",
"brand",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"notes",
"created_at",
"updated_at",
]
RESOLUTION_FIELDS = [
"observed_product_id",
"canonical_product_id",
"resolution_action",
"status",
"resolution_notes",
"reviewed_at",
]
def decimal_or_zero(value):
return to_decimal(value) or Decimal("0")
@@ -138,7 +165,50 @@ def order_lookup(rows, retailer):
}
def build_link_lookup(enriched_rows):
def read_optional_csv_rows(path):
path = Path(path)
if not path.exists():
return []
return read_csv_rows(path)
def load_resolution_lookup(resolution_rows):
lookup = {}
for row in resolution_rows:
if not row.get("observed_product_id"):
continue
lookup[row["observed_product_id"]] = row
return lookup
def merge_catalog_rows(existing_rows, auto_rows):
merged = {}
for row in auto_rows + existing_rows:
canonical_product_id = row.get("canonical_product_id", "")
if canonical_product_id:
merged[canonical_product_id] = row
return sorted(merged.values(), key=lambda row: row["canonical_product_id"])
def catalog_row_from_canonical(row):
return {
"canonical_product_id": row.get("canonical_product_id", ""),
"canonical_name": row.get("canonical_name", ""),
"category": row.get("category", ""),
"product_type": row.get("product_type", ""),
"brand": row.get("brand", ""),
"variant": row.get("variant", ""),
"size_value": row.get("size_value", ""),
"size_unit": row.get("size_unit", ""),
"pack_qty": row.get("pack_qty", ""),
"measure_type": row.get("measure_type", ""),
"notes": row.get("notes", ""),
"created_at": row.get("created_at", ""),
"updated_at": row.get("updated_at", ""),
}
def build_link_state(enriched_rows):
observed_rows = build_observed_products.build_observed_products(enriched_rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
@@ -155,12 +225,34 @@ def build_link_lookup(enriched_rows):
canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
}
return observed_id_by_key, canonical_id_by_observed
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders):
def build_purchase_rows(
giant_enriched_rows,
costco_enriched_rows,
giant_orders,
costco_orders,
resolution_rows,
):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
observed_id_by_key, canonical_id_by_observed = build_link_lookup(all_enriched_rows)
(
observed_rows,
canonical_rows,
link_rows,
observed_id_by_key,
canonical_id_by_observed,
) = build_link_state(all_enriched_rows)
resolution_lookup = load_resolution_lookup(resolution_rows)
for observed_product_id, resolution in resolution_lookup.items():
action = resolution.get("resolution_action", "")
status = resolution.get("status", "")
if status != "approved":
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"]
elif action == "exclude":
canonical_id_by_observed[observed_product_id] = ""
orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco"))
@@ -174,6 +266,7 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
observed_product_id = observed_id_by_key.get(observed_key, "")
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row)
resolution = resolution_lookup.get(observed_product_id, {})
purchase_rows.append(
{
"purchase_date": row["order_date"],
@@ -183,6 +276,8 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
"observed_item_key": row["observed_item_key"],
"observed_product_id": observed_product_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
"review_status": resolution.get("status", ""),
"resolution_action": resolution.get("resolution_action", ""),
"raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"],
"retailer_item_id": row["retailer_item_id"],
@@ -206,7 +301,33 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
**metrics,
}
)
return purchase_rows
return purchase_rows, observed_rows, canonical_rows, link_rows
def apply_manual_resolutions_to_links(link_rows, resolution_rows):
link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows}
for resolution in resolution_rows:
if resolution.get("status") != "approved":
continue
observed_product_id = resolution.get("observed_product_id", "")
action = resolution.get("resolution_action", "")
if not observed_product_id:
continue
if action == "exclude":
link_by_observed.pop(observed_product_id, None)
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
link_by_observed[observed_product_id] = {
"observed_product_id": observed_product_id,
"canonical_product_id": resolution["canonical_product_id"],
"link_method": f"manual_{action}",
"link_confidence": "high",
"review_status": resolution.get("status", ""),
"reviewed_by": "",
"reviewed_at": resolution.get("reviewed_at", ""),
"link_notes": resolution.get("resolution_notes", ""),
}
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
def build_comparison_examples(purchase_rows):
@@ -245,6 +366,9 @@ def build_comparison_examples(purchase_rows):
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--links-csv", default="combined_output/product_links.csv", show_default=True)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
def main(
@@ -252,20 +376,34 @@ def main(
costco_items_enriched_csv,
giant_orders_csv,
costco_orders_csv,
resolutions_csv,
catalog_csv,
links_csv,
output_csv,
examples_csv,
):
purchase_rows = build_purchase_rows(
resolution_rows = read_optional_csv_rows(resolutions_csv)
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows(
read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv),
resolution_rows,
)
existing_catalog_rows = read_optional_csv_rows(catalog_csv)
merged_catalog_rows = merge_catalog_rows(
existing_catalog_rows,
[catalog_row_from_canonical(row) for row in canonical_rows],
)
link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows)
example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS)
write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv} "
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, "
f"and {len(example_rows)} comparison examples to {examples_csv}"
)