671 lines
26 KiB
Python
671 lines
26 KiB
Python
from collections import defaultdict
|
|
from datetime import date
|
|
import re
|
|
|
|
import click
|
|
|
|
import build_purchases
|
|
from layer_helpers import compact_join, stable_id, write_csv_rows
|
|
|
|
|
|
QUEUE_FIELDS = [
|
|
"review_id",
|
|
"retailer",
|
|
"normalized_item_id",
|
|
"catalog_id",
|
|
"reason_code",
|
|
"priority",
|
|
"raw_item_names",
|
|
"normalized_names",
|
|
"upc_values",
|
|
"example_prices",
|
|
"seen_count",
|
|
"status",
|
|
"resolution_action",
|
|
"resolution_notes",
|
|
"created_at",
|
|
"updated_at",
|
|
]
|
|
|
|
INFO_COLOR = "cyan"
|
|
PROMPT_COLOR = "bright_yellow"
|
|
WARNING_COLOR = "magenta"
|
|
TOKEN_RE = re.compile(r"[A-Z0-9]+")
|
|
REQUIRED_CATALOG_FIELDS = ("catalog_name", "product_type")
|
|
|
|
|
|
def print_intro_text():
|
|
click.secho("Review guide:", fg=INFO_COLOR)
|
|
click.echo(" catalog name: unique product identity including variant, but not packaging")
|
|
click.echo(" product type: general product you want to compare across purchases")
|
|
click.echo(" category: broad analysis bucket such as dairy, produce, or frozen")
|
|
|
|
|
|
def has_complete_catalog_row(catalog_row):
|
|
if not catalog_row:
|
|
return False
|
|
return all(catalog_row.get(field, "").strip() for field in REQUIRED_CATALOG_FIELDS)
|
|
|
|
|
|
def load_queue_lookup(queue_rows):
|
|
lookup = {}
|
|
for row in queue_rows:
|
|
normalized_item_id = row.get("normalized_item_id", "")
|
|
if normalized_item_id:
|
|
lookup[normalized_item_id] = row
|
|
return lookup
|
|
|
|
|
|
def build_review_queue(
|
|
purchase_rows,
|
|
resolution_rows,
|
|
link_rows=None,
|
|
catalog_rows=None,
|
|
existing_queue_rows=None,
|
|
):
|
|
by_normalized = defaultdict(list)
|
|
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
|
|
link_lookup = build_purchases.load_link_lookup(link_rows or [])
|
|
catalog_lookup = {
|
|
row.get("catalog_id", ""): build_purchases.normalize_catalog_row(row)
|
|
for row in (catalog_rows or [])
|
|
if row.get("catalog_id", "")
|
|
}
|
|
queue_lookup = load_queue_lookup(existing_queue_rows or [])
|
|
|
|
for row in purchase_rows:
|
|
normalized_item_id = row.get("normalized_item_id", "")
|
|
if not normalized_item_id:
|
|
continue
|
|
by_normalized[normalized_item_id].append(row)
|
|
|
|
today_text = str(date.today())
|
|
queue_rows = []
|
|
for normalized_item_id, rows in sorted(by_normalized.items()):
|
|
current_resolution = resolution_lookup.get(normalized_item_id, {})
|
|
if current_resolution.get("status") == "approved" and current_resolution.get("resolution_action") == "exclude":
|
|
continue
|
|
|
|
existing_queue_row = queue_lookup.get(normalized_item_id, {})
|
|
linked_catalog_id = current_resolution.get("catalog_id") or link_lookup.get(normalized_item_id, {}).get("catalog_id", "")
|
|
linked_catalog_row = catalog_lookup.get(linked_catalog_id, {})
|
|
has_valid_catalog_link = bool(linked_catalog_id and has_complete_catalog_row(linked_catalog_row))
|
|
|
|
unresolved_rows = [
|
|
row
|
|
for row in rows
|
|
if row.get("is_item", "true") != "false"
|
|
and row.get("is_fee") != "true"
|
|
and row.get("is_discount_line") != "true"
|
|
and row.get("is_coupon_line") != "true"
|
|
]
|
|
if not unresolved_rows or has_valid_catalog_link:
|
|
continue
|
|
|
|
retailers = sorted({row["retailer"] for row in rows})
|
|
review_id = stable_id("rvw", normalized_item_id)
|
|
reason_code = "missing_catalog_link"
|
|
if linked_catalog_id and linked_catalog_id not in catalog_lookup:
|
|
reason_code = "orphaned_catalog_link"
|
|
elif linked_catalog_id and not has_complete_catalog_row(linked_catalog_row):
|
|
reason_code = "incomplete_catalog_link"
|
|
|
|
queue_rows.append(
|
|
{
|
|
"review_id": review_id,
|
|
"retailer": " | ".join(retailers),
|
|
"normalized_item_id": normalized_item_id,
|
|
"catalog_id": linked_catalog_id,
|
|
"reason_code": reason_code,
|
|
"priority": "high",
|
|
"raw_item_names": compact_join(
|
|
sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}),
|
|
limit=8,
|
|
),
|
|
"normalized_names": compact_join(
|
|
sorted(
|
|
{
|
|
row["normalized_item_name"]
|
|
for row in rows
|
|
if row["normalized_item_name"]
|
|
}
|
|
),
|
|
limit=8,
|
|
),
|
|
"upc_values": compact_join(
|
|
sorted({row["upc"] for row in rows if row["upc"]}),
|
|
limit=8,
|
|
),
|
|
"example_prices": compact_join(
|
|
sorted({row["line_total"] for row in rows if row["line_total"]}),
|
|
limit=8,
|
|
),
|
|
"seen_count": str(len(rows)),
|
|
"status": existing_queue_row.get("status") or current_resolution.get("status", "pending"),
|
|
"resolution_action": existing_queue_row.get("resolution_action")
|
|
or current_resolution.get("resolution_action", ""),
|
|
"resolution_notes": existing_queue_row.get("resolution_notes")
|
|
or current_resolution.get("resolution_notes", ""),
|
|
"created_at": existing_queue_row.get("created_at")
|
|
or current_resolution.get("reviewed_at", today_text),
|
|
"updated_at": today_text,
|
|
}
|
|
)
|
|
return queue_rows
|
|
|
|
|
|
def save_resolution_rows(path, rows):
|
|
write_csv_rows(path, rows, build_purchases.RESOLUTION_FIELDS)
|
|
|
|
|
|
def save_catalog_rows(path, rows):
|
|
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
|
|
|
|
|
|
def save_link_rows(path, rows):
|
|
write_csv_rows(path, rows, build_purchases.PRODUCT_LINK_FIELDS)
|
|
|
|
|
|
def sort_related_items(rows):
|
|
return sorted(
|
|
rows,
|
|
key=lambda row: (
|
|
row.get("purchase_date", ""),
|
|
row.get("order_id", ""),
|
|
int(row.get("line_no", "0") or "0"),
|
|
),
|
|
reverse=True,
|
|
)
|
|
|
|
|
|
def tokenize_match_text(*values):
|
|
tokens = set()
|
|
for value in values:
|
|
tokens.update(TOKEN_RE.findall((value or "").upper()))
|
|
return tokens
|
|
|
|
|
|
def build_catalog_suggestions(related_rows, purchase_rows, catalog_rows, limit=3):
|
|
normalized_names = {
|
|
row.get("normalized_item_name", "").strip().upper()
|
|
for row in related_rows
|
|
if row.get("normalized_item_name", "").strip()
|
|
}
|
|
upcs = {
|
|
row.get("upc", "").strip()
|
|
for row in related_rows
|
|
if row.get("upc", "").strip()
|
|
}
|
|
catalog_by_id = {
|
|
row.get("catalog_id", ""): row for row in catalog_rows if row.get("catalog_id", "")
|
|
}
|
|
suggestions = []
|
|
seen_ids = set()
|
|
|
|
def add_catalog_id(catalog_id, reason):
|
|
if not catalog_id or catalog_id in seen_ids or catalog_id not in catalog_by_id:
|
|
return False
|
|
seen_ids.add(catalog_id)
|
|
catalog_row = catalog_by_id[catalog_id]
|
|
suggestions.append(
|
|
{
|
|
"catalog_id": catalog_id,
|
|
"catalog_name": catalog_row.get("catalog_name", ""),
|
|
"reason": reason,
|
|
}
|
|
)
|
|
return len(suggestions) >= limit
|
|
|
|
reviewed_purchase_rows = [
|
|
row for row in purchase_rows if row.get("catalog_id") and row.get("normalized_item_id")
|
|
]
|
|
for row in reviewed_purchase_rows:
|
|
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs:
|
|
if add_catalog_id(row.get("catalog_id", ""), "exact upc"):
|
|
return suggestions
|
|
|
|
for row in reviewed_purchase_rows:
|
|
if row.get("normalized_item_name", "").strip().upper() in normalized_names:
|
|
if add_catalog_id(row.get("catalog_id", ""), "exact normalized name"):
|
|
return suggestions
|
|
|
|
for catalog_row in catalog_rows:
|
|
catalog_name = catalog_row.get("catalog_name", "").strip().upper()
|
|
if not catalog_name:
|
|
continue
|
|
for normalized_name in normalized_names:
|
|
if normalized_name in catalog_name or catalog_name in normalized_name:
|
|
if add_catalog_id(catalog_row.get("catalog_id", ""), "catalog name contains match"):
|
|
return suggestions
|
|
break
|
|
return suggestions
|
|
|
|
|
|
def search_catalog_rows(query, catalog_rows, purchase_rows, current_normalized_item_id, limit=10):
|
|
query_tokens = tokenize_match_text(query)
|
|
if not query_tokens:
|
|
return []
|
|
|
|
linked_purchase_counts = defaultdict(int)
|
|
linked_normalized_ids = defaultdict(set)
|
|
current_catalog_id = ""
|
|
for row in purchase_rows:
|
|
catalog_id = row.get("catalog_id", "")
|
|
normalized_item_id = row.get("normalized_item_id", "")
|
|
if catalog_id and normalized_item_id:
|
|
linked_purchase_counts[catalog_id] += 1
|
|
linked_normalized_ids[catalog_id].add(normalized_item_id)
|
|
if normalized_item_id == current_normalized_item_id and catalog_id:
|
|
current_catalog_id = catalog_id
|
|
|
|
ranked_rows = []
|
|
for row in catalog_rows:
|
|
catalog_id = row.get("catalog_id", "")
|
|
if not catalog_id or catalog_id == current_catalog_id:
|
|
continue
|
|
catalog_tokens = tokenize_match_text(
|
|
row.get("catalog_name", ""),
|
|
row.get("product_type", ""),
|
|
row.get("variant", ""),
|
|
)
|
|
overlap = query_tokens & catalog_tokens
|
|
if not overlap:
|
|
continue
|
|
ranked_rows.append(
|
|
{
|
|
"catalog_id": catalog_id,
|
|
"catalog_name": row.get("catalog_name", ""),
|
|
"product_type": row.get("product_type", ""),
|
|
"category": row.get("category", ""),
|
|
"variant": row.get("variant", ""),
|
|
"linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())),
|
|
"linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0),
|
|
"score": len(overlap),
|
|
}
|
|
)
|
|
|
|
ranked_rows.sort(
|
|
key=lambda row: (-row["score"], row["catalog_name"], row["catalog_id"])
|
|
)
|
|
return ranked_rows[:limit]
|
|
|
|
|
|
def suggestion_display_rows(suggestions, purchase_rows, catalog_rows):
|
|
linked_purchase_counts = defaultdict(int)
|
|
linked_normalized_ids = defaultdict(set)
|
|
for row in purchase_rows:
|
|
catalog_id = row.get("catalog_id", "")
|
|
normalized_item_id = row.get("normalized_item_id", "")
|
|
if not catalog_id or not normalized_item_id:
|
|
continue
|
|
linked_purchase_counts[catalog_id] += 1
|
|
linked_normalized_ids[catalog_id].add(normalized_item_id)
|
|
|
|
display_rows = []
|
|
catalog_details = {
|
|
row["catalog_id"]: {
|
|
"product_type": row.get("product_type", ""),
|
|
"category": row.get("category", ""),
|
|
}
|
|
for row in catalog_rows
|
|
if row.get("catalog_id")
|
|
}
|
|
for row in purchase_rows:
|
|
if row.get("catalog_id"):
|
|
catalog_details.setdefault(
|
|
row["catalog_id"],
|
|
{
|
|
"product_type": row.get("product_type", ""),
|
|
"category": row.get("category", ""),
|
|
},
|
|
)
|
|
|
|
for row in suggestions:
|
|
catalog_id = row["catalog_id"]
|
|
details = catalog_details.get(catalog_id, {})
|
|
display_rows.append(
|
|
{
|
|
**row,
|
|
"product_type": details.get("product_type", ""),
|
|
"category": details.get("category", ""),
|
|
"linked_purchase_rows": linked_purchase_counts.get(catalog_id, 0),
|
|
"linked_normalized_items": len(linked_normalized_ids.get(catalog_id, set())),
|
|
}
|
|
)
|
|
return display_rows
|
|
|
|
|
|
def print_catalog_rows(rows):
|
|
for index, row in enumerate(rows, start=1):
|
|
click.echo(
|
|
f" [{index}] {row['catalog_name']}, {row.get('product_type', '')}, "
|
|
f"{row.get('category', '')} ({row['linked_normalized_items']} items, "
|
|
f"{row['linked_purchase_rows']} rows)"
|
|
)
|
|
|
|
|
|
def build_display_lines(related_rows):
|
|
lines = []
|
|
for index, row in enumerate(sort_related_items(related_rows), start=1):
|
|
lines.append(
|
|
" [{index}] {raw_item_name} | {retailer} | {purchase_date} | {line_total} | {image_url}".format(
|
|
index=index,
|
|
raw_item_name=row.get("raw_item_name", ""),
|
|
retailer=row.get("retailer", ""),
|
|
purchase_date=row.get("purchase_date", ""),
|
|
line_total=row.get("line_total", ""),
|
|
image_url=row.get("image_url", ""),
|
|
)
|
|
)
|
|
if not lines:
|
|
lines.append(" [1] no matched item rows found")
|
|
return lines
|
|
|
|
|
|
def normalized_label(queue_row, related_rows):
|
|
if queue_row.get("normalized_names"):
|
|
return queue_row["normalized_names"].split(" | ")[0]
|
|
for row in related_rows:
|
|
if row.get("normalized_item_name"):
|
|
return row["normalized_item_name"]
|
|
return queue_row.get("normalized_item_id", "")
|
|
|
|
|
|
def choose_existing_catalog(display_rows, normalized_name, matched_count):
|
|
click.secho(
|
|
f"Select the catalog_name to associate {matched_count} items with:",
|
|
fg=INFO_COLOR,
|
|
)
|
|
print_catalog_rows(display_rows)
|
|
choice = click.prompt(
|
|
click.style("selection", fg=PROMPT_COLOR),
|
|
type=click.IntRange(1, len(display_rows)),
|
|
)
|
|
chosen_row = display_rows[choice - 1]
|
|
click.echo(
|
|
f'{matched_count} "{normalized_name}" items and future matches will be associated '
|
|
f'with "{chosen_row["catalog_name"]}".'
|
|
)
|
|
click.secho("actions: [y]es [n]o [b]ack [s]kip [q]uit", fg=PROMPT_COLOR)
|
|
confirm = click.prompt(
|
|
click.style("confirm", fg=PROMPT_COLOR),
|
|
type=click.Choice(["y", "n", "b", "s", "q"]),
|
|
)
|
|
if confirm == "y":
|
|
return chosen_row["catalog_id"], ""
|
|
if confirm == "s":
|
|
return "", "skip"
|
|
if confirm == "q":
|
|
return "", "quit"
|
|
return "", "back"
|
|
|
|
|
|
def prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total):
|
|
suggestions = suggestion_display_rows(
|
|
build_catalog_suggestions(related_rows, purchase_rows, catalog_rows),
|
|
purchase_rows,
|
|
catalog_rows,
|
|
)
|
|
normalized_name = normalized_label(queue_row, related_rows)
|
|
matched_count = len(related_rows)
|
|
click.echo("")
|
|
click.secho(
|
|
f"Review {queue_index}/{queue_total}: {normalized_name}",
|
|
fg=INFO_COLOR,
|
|
)
|
|
click.echo(f"{matched_count} matched items:")
|
|
for line in build_display_lines(related_rows):
|
|
click.echo(line)
|
|
if suggestions:
|
|
click.echo(f"{len(suggestions)} catalog_name suggestions found:")
|
|
print_catalog_rows(suggestions)
|
|
else:
|
|
click.echo("no catalog_name suggestions found")
|
|
prompt_bits = []
|
|
if suggestions:
|
|
prompt_bits.append("[#] link to suggestion")
|
|
prompt_bits.extend(["[f]ind", "[n]ew", "[s]kip", "e[x]clude", "[q]uit"])
|
|
click.secho(" ".join(prompt_bits) + " >", fg=PROMPT_COLOR)
|
|
action = click.prompt("", type=str, prompt_suffix=" ").strip().lower()
|
|
if action.isdigit() and suggestions:
|
|
choice = int(action)
|
|
if 1 <= choice <= len(suggestions):
|
|
chosen_row = suggestions[choice - 1]
|
|
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
|
|
return {
|
|
"normalized_item_id": queue_row["normalized_item_id"],
|
|
"catalog_id": chosen_row["catalog_id"],
|
|
"resolution_action": "link",
|
|
"status": "approved",
|
|
"resolution_notes": notes,
|
|
"reviewed_at": str(date.today()),
|
|
}, None
|
|
click.secho("invalid suggestion number", fg=WARNING_COLOR)
|
|
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
|
|
if action == "q":
|
|
return None, None
|
|
if action == "s":
|
|
return {
|
|
"normalized_item_id": queue_row["normalized_item_id"],
|
|
"catalog_id": "",
|
|
"resolution_action": "skip",
|
|
"status": "pending",
|
|
"resolution_notes": queue_row.get("resolution_notes", ""),
|
|
"reviewed_at": str(date.today()),
|
|
}, None
|
|
if action == "f":
|
|
while True:
|
|
query = click.prompt(click.style("search", fg=PROMPT_COLOR), default="", show_default=False).strip()
|
|
if not query:
|
|
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
|
|
search_rows = search_catalog_rows(
|
|
query,
|
|
catalog_rows,
|
|
purchase_rows,
|
|
queue_row["normalized_item_id"],
|
|
)
|
|
if not search_rows:
|
|
click.echo("no matches found")
|
|
retry = click.prompt(
|
|
click.style("search again? [enter=yes, q=no]", fg=PROMPT_COLOR),
|
|
default="",
|
|
show_default=False,
|
|
).strip().lower()
|
|
if retry == "q":
|
|
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
|
|
continue
|
|
click.echo(f"{len(search_rows)} search results found:")
|
|
print_catalog_rows(search_rows)
|
|
choice = click.prompt(
|
|
click.style("selection", fg=PROMPT_COLOR),
|
|
type=click.IntRange(1, len(search_rows)),
|
|
)
|
|
chosen_row = search_rows[choice - 1]
|
|
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
|
|
return {
|
|
"normalized_item_id": queue_row["normalized_item_id"],
|
|
"catalog_id": chosen_row["catalog_id"],
|
|
"resolution_action": "link",
|
|
"status": "approved",
|
|
"resolution_notes": notes,
|
|
"reviewed_at": str(date.today()),
|
|
}, None
|
|
if action == "x":
|
|
notes = click.prompt(click.style("exclude notes", fg=PROMPT_COLOR), default="", show_default=False)
|
|
return {
|
|
"normalized_item_id": queue_row["normalized_item_id"],
|
|
"catalog_id": "",
|
|
"resolution_action": "exclude",
|
|
"status": "approved",
|
|
"resolution_notes": notes,
|
|
"reviewed_at": str(date.today()),
|
|
}, None
|
|
if action != "n":
|
|
click.secho("invalid action", fg=WARNING_COLOR)
|
|
return prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, queue_index, queue_total)
|
|
|
|
catalog_name = click.prompt(click.style("catalog name", fg=PROMPT_COLOR), type=str)
|
|
product_type = click.prompt(click.style("product type", fg=PROMPT_COLOR), default="", show_default=False)
|
|
category = click.prompt(click.style("category", fg=PROMPT_COLOR), default="", show_default=False)
|
|
notes = click.prompt(click.style("notes", fg=PROMPT_COLOR), default="", show_default=False)
|
|
catalog_id = stable_id("cat", f"manual|{catalog_name}|{category}|{product_type}")
|
|
catalog_row = {
|
|
"catalog_id": catalog_id,
|
|
"catalog_name": catalog_name,
|
|
"category": category,
|
|
"product_type": product_type,
|
|
"brand": "",
|
|
"variant": "",
|
|
"size_value": "",
|
|
"size_unit": "",
|
|
"pack_qty": "",
|
|
"measure_type": "",
|
|
"notes": notes,
|
|
"created_at": str(date.today()),
|
|
"updated_at": str(date.today()),
|
|
}
|
|
resolution_row = {
|
|
"normalized_item_id": queue_row["normalized_item_id"],
|
|
"catalog_id": catalog_id,
|
|
"resolution_action": "create",
|
|
"status": "approved",
|
|
"resolution_notes": notes,
|
|
"reviewed_at": str(date.today()),
|
|
}
|
|
return resolution_row, catalog_row
|
|
|
|
|
|
def apply_resolution_to_queue(queue_rows, resolution_lookup):
|
|
today_text = str(date.today())
|
|
updated_rows = []
|
|
for row in queue_rows:
|
|
resolution = resolution_lookup.get(row["normalized_item_id"], {})
|
|
row_copy = dict(row)
|
|
if resolution:
|
|
row_copy["catalog_id"] = resolution.get("catalog_id", "")
|
|
row_copy["status"] = resolution.get("status", row_copy.get("status", "pending"))
|
|
row_copy["resolution_action"] = resolution.get("resolution_action", "")
|
|
row_copy["resolution_notes"] = resolution.get("resolution_notes", "")
|
|
row_copy["updated_at"] = resolution.get("reviewed_at", today_text)
|
|
if resolution.get("status") == "approved":
|
|
row_copy["created_at"] = row_copy.get("created_at") or resolution.get("reviewed_at", today_text)
|
|
updated_rows.append(row_copy)
|
|
return updated_rows
|
|
|
|
|
|
def link_rows_from_state(link_lookup):
|
|
return sorted(link_lookup.values(), key=lambda row: row["normalized_item_id"])
|
|
|
|
|
|
@click.command()
|
|
@click.option("--giant-items-enriched-csv", default="data/giant-web/normalized_items.csv", show_default=True)
|
|
@click.option("--costco-items-enriched-csv", default="data/costco-web/normalized_items.csv", show_default=True)
|
|
@click.option("--giant-orders-csv", default="data/giant-web/collected_orders.csv", show_default=True)
|
|
@click.option("--costco-orders-csv", default="data/costco-web/collected_orders.csv", show_default=True)
|
|
@click.option("--purchases-csv", default="data/analysis/purchases.csv", show_default=True)
|
|
@click.option("--queue-csv", default="data/review/review_queue.csv", show_default=True)
|
|
@click.option("--resolutions-csv", default="data/review/review_resolutions.csv", show_default=True)
|
|
@click.option("--catalog-csv", default="data/review/catalog.csv", show_default=True)
|
|
@click.option("--links-csv", default="data/review/product_links.csv", show_default=True)
|
|
@click.option("--limit", default=0, show_default=True, type=int)
|
|
@click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.")
|
|
def main(
|
|
giant_items_enriched_csv,
|
|
costco_items_enriched_csv,
|
|
giant_orders_csv,
|
|
costco_orders_csv,
|
|
purchases_csv,
|
|
queue_csv,
|
|
resolutions_csv,
|
|
catalog_csv,
|
|
links_csv,
|
|
limit,
|
|
refresh_only,
|
|
):
|
|
resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv)
|
|
catalog_rows = build_purchases.merge_catalog_rows(build_purchases.read_optional_csv_rows(catalog_csv), [])
|
|
link_rows = build_purchases.read_optional_csv_rows(links_csv)
|
|
purchase_rows, refreshed_link_rows = build_purchases.build_purchase_rows(
|
|
build_purchases.read_optional_csv_rows(giant_items_enriched_csv),
|
|
build_purchases.read_optional_csv_rows(costco_items_enriched_csv),
|
|
build_purchases.read_optional_csv_rows(giant_orders_csv),
|
|
build_purchases.read_optional_csv_rows(costco_orders_csv),
|
|
resolution_rows,
|
|
link_rows,
|
|
catalog_rows,
|
|
)
|
|
build_purchases.write_csv_rows(purchases_csv, purchase_rows, build_purchases.PURCHASE_FIELDS)
|
|
link_lookup = build_purchases.load_link_lookup(refreshed_link_rows)
|
|
queue_rows = build_review_queue(
|
|
purchase_rows,
|
|
resolution_rows,
|
|
refreshed_link_rows,
|
|
catalog_rows,
|
|
build_purchases.read_optional_csv_rows(queue_csv),
|
|
)
|
|
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
|
|
click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}")
|
|
|
|
if refresh_only:
|
|
return
|
|
|
|
print_intro_text()
|
|
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
|
|
catalog_by_id = {row["catalog_id"]: row for row in catalog_rows if row.get("catalog_id")}
|
|
rows_by_normalized = defaultdict(list)
|
|
for row in purchase_rows:
|
|
normalized_item_id = row.get("normalized_item_id", "")
|
|
if normalized_item_id:
|
|
rows_by_normalized[normalized_item_id].append(row)
|
|
|
|
reviewed = 0
|
|
for index, queue_row in enumerate(queue_rows, start=1):
|
|
if limit and reviewed >= limit:
|
|
break
|
|
related_rows = rows_by_normalized.get(queue_row["normalized_item_id"], [])
|
|
result = prompt_resolution(queue_row, related_rows, purchase_rows, catalog_rows, index, len(queue_rows))
|
|
if result == (None, None):
|
|
break
|
|
resolution_row, catalog_row = result
|
|
resolution_lookup[resolution_row["normalized_item_id"]] = resolution_row
|
|
if catalog_row and catalog_row["catalog_id"] not in catalog_by_id:
|
|
catalog_by_id[catalog_row["catalog_id"]] = catalog_row
|
|
catalog_rows.append(catalog_row)
|
|
normalized_item_id = resolution_row["normalized_item_id"]
|
|
if resolution_row["status"] == "approved":
|
|
if resolution_row["resolution_action"] in {"link", "create"} and resolution_row.get("catalog_id"):
|
|
link_lookup[normalized_item_id] = {
|
|
"normalized_item_id": normalized_item_id,
|
|
"catalog_id": resolution_row["catalog_id"],
|
|
"link_method": f"manual_{resolution_row['resolution_action']}",
|
|
"link_confidence": "high",
|
|
"review_status": "approved",
|
|
"reviewed_by": "",
|
|
"reviewed_at": resolution_row.get("reviewed_at", ""),
|
|
"link_notes": resolution_row.get("resolution_notes", ""),
|
|
}
|
|
elif resolution_row["resolution_action"] == "exclude":
|
|
link_lookup.pop(normalized_item_id, None)
|
|
queue_rows = apply_resolution_to_queue(queue_rows, resolution_lookup)
|
|
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
|
|
save_resolution_rows(
|
|
resolutions_csv,
|
|
sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]),
|
|
)
|
|
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"]))
|
|
save_link_rows(links_csv, link_rows_from_state(link_lookup))
|
|
reviewed += 1
|
|
|
|
save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["normalized_item_id"]))
|
|
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["catalog_id"]))
|
|
save_link_rows(links_csv, link_rows_from_state(link_lookup))
|
|
click.echo(
|
|
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv}, "
|
|
f"{len(catalog_by_id)} catalog rows to {catalog_csv}, "
|
|
f"and {len(link_lookup)} product links to {links_csv}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|