Build observed product review queue
This commit is contained in:
168
build_review_queue.py
Normal file
168
build_review_queue.py
Normal file
@@ -0,0 +1,168 @@
|
||||
from collections import defaultdict
|
||||
from datetime import date
|
||||
|
||||
import click
|
||||
|
||||
from layer_helpers import compact_join, distinct_values, read_csv_rows, stable_id, write_csv_rows
|
||||
|
||||
|
||||
OUTPUT_FIELDS = [
|
||||
"review_id",
|
||||
"queue_type",
|
||||
"retailer",
|
||||
"observed_product_id",
|
||||
"canonical_product_id",
|
||||
"reason_code",
|
||||
"priority",
|
||||
"raw_item_names",
|
||||
"normalized_names",
|
||||
"upc",
|
||||
"image_url",
|
||||
"example_prices",
|
||||
"seen_count",
|
||||
"status",
|
||||
"resolution_notes",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
]
|
||||
|
||||
|
||||
def existing_review_state(path):
|
||||
try:
|
||||
rows = read_csv_rows(path)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
return {row["review_id"]: row for row in rows}
|
||||
|
||||
|
||||
def review_reasons(observed_row):
|
||||
reasons = []
|
||||
if observed_row["is_fee"] == "true":
|
||||
return reasons
|
||||
if observed_row["distinct_upcs_count"] not in {"", "0", "1"}:
|
||||
reasons.append(("multiple_upcs", "high"))
|
||||
if observed_row["distinct_item_names_count"] not in {"", "0", "1"}:
|
||||
reasons.append(("multiple_raw_names", "medium"))
|
||||
if not observed_row["representative_image_url"]:
|
||||
reasons.append(("missing_image", "medium"))
|
||||
if not observed_row["representative_upc"]:
|
||||
reasons.append(("missing_upc", "high"))
|
||||
if not observed_row["representative_name_norm"]:
|
||||
reasons.append(("missing_normalized_name", "high"))
|
||||
return reasons
|
||||
|
||||
|
||||
def build_review_queue(observed_rows, item_rows, existing_rows, today_text):
|
||||
by_observed = defaultdict(list)
|
||||
for row in item_rows:
|
||||
observed_id = row.get("observed_product_id", "")
|
||||
if observed_id:
|
||||
by_observed[observed_id].append(row)
|
||||
|
||||
queue_rows = []
|
||||
for observed_row in observed_rows:
|
||||
reasons = review_reasons(observed_row)
|
||||
if not reasons:
|
||||
continue
|
||||
|
||||
related_items = by_observed.get(observed_row["observed_product_id"], [])
|
||||
raw_names = compact_join(distinct_values(related_items, "item_name"), limit=5)
|
||||
norm_names = compact_join(
|
||||
distinct_values(related_items, "item_name_norm"), limit=5
|
||||
)
|
||||
example_prices = compact_join(
|
||||
distinct_values(related_items, "line_total"), limit=5
|
||||
)
|
||||
|
||||
for reason_code, priority in reasons:
|
||||
review_id = stable_id(
|
||||
"rvw",
|
||||
f"{observed_row['observed_product_id']}|{reason_code}",
|
||||
)
|
||||
prior = existing_rows.get(review_id, {})
|
||||
queue_rows.append(
|
||||
{
|
||||
"review_id": review_id,
|
||||
"queue_type": "observed_product",
|
||||
"retailer": observed_row["retailer"],
|
||||
"observed_product_id": observed_row["observed_product_id"],
|
||||
"canonical_product_id": prior.get("canonical_product_id", ""),
|
||||
"reason_code": reason_code,
|
||||
"priority": priority,
|
||||
"raw_item_names": raw_names,
|
||||
"normalized_names": norm_names,
|
||||
"upc": observed_row["representative_upc"],
|
||||
"image_url": observed_row["representative_image_url"],
|
||||
"example_prices": example_prices,
|
||||
"seen_count": observed_row["times_seen"],
|
||||
"status": prior.get("status", "pending"),
|
||||
"resolution_notes": prior.get("resolution_notes", ""),
|
||||
"created_at": prior.get("created_at", today_text),
|
||||
"updated_at": today_text,
|
||||
}
|
||||
)
|
||||
|
||||
queue_rows.sort(key=lambda row: (row["priority"], row["reason_code"], row["review_id"]))
|
||||
return queue_rows
|
||||
|
||||
|
||||
def attach_observed_ids(item_rows, observed_rows):
|
||||
observed_by_key = {row["observed_key"]: row["observed_product_id"] for row in observed_rows}
|
||||
attached = []
|
||||
for row in item_rows:
|
||||
observed_key = "|".join(
|
||||
[
|
||||
row["retailer"],
|
||||
f"upc={row['upc']}",
|
||||
f"name={row['item_name_norm']}",
|
||||
]
|
||||
) if row.get("upc") else "|".join(
|
||||
[
|
||||
row["retailer"],
|
||||
f"name={row['item_name_norm']}",
|
||||
f"size={row['size_value']}",
|
||||
f"unit={row['size_unit']}",
|
||||
f"pack={row['pack_qty']}",
|
||||
f"measure={row['measure_type']}",
|
||||
f"store_brand={row['is_store_brand']}",
|
||||
f"fee={row['is_fee']}",
|
||||
]
|
||||
)
|
||||
enriched = dict(row)
|
||||
enriched["observed_product_id"] = observed_by_key.get(observed_key, "")
|
||||
attached.append(enriched)
|
||||
return attached
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option(
|
||||
"--observed-csv",
|
||||
default="giant_output/products_observed.csv",
|
||||
show_default=True,
|
||||
help="Path to observed product rows.",
|
||||
)
|
||||
@click.option(
|
||||
"--items-enriched-csv",
|
||||
default="giant_output/items_enriched.csv",
|
||||
show_default=True,
|
||||
help="Path to enriched Giant item rows.",
|
||||
)
|
||||
@click.option(
|
||||
"--output-csv",
|
||||
default="giant_output/review_queue.csv",
|
||||
show_default=True,
|
||||
help="Path to review queue output.",
|
||||
)
|
||||
def main(observed_csv, items_enriched_csv, output_csv):
|
||||
observed_rows = read_csv_rows(observed_csv)
|
||||
item_rows = read_csv_rows(items_enriched_csv)
|
||||
item_rows = attach_observed_ids(item_rows, observed_rows)
|
||||
existing_rows = existing_review_state(output_csv)
|
||||
today_text = str(date.today())
|
||||
queue_rows = build_review_queue(observed_rows, item_rows, existing_rows, today_text)
|
||||
write_csv_rows(output_csv, queue_rows, OUTPUT_FIELDS)
|
||||
click.echo(f"wrote {len(queue_rows)} rows to {output_csv}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user