Files
scrape-giant/build_canonical_layer.py

213 lines
6.5 KiB
Python

import click
from layer_helpers import read_csv_rows, representative_value, stable_id, write_csv_rows
CANONICAL_FIELDS = [
"canonical_product_id",
"canonical_name",
"product_type",
"brand",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"normalized_quantity",
"normalized_quantity_unit",
"notes",
"created_at",
"updated_at",
]
LINK_FIELDS = [
"observed_product_id",
"canonical_product_id",
"link_method",
"link_confidence",
"review_status",
"reviewed_by",
"reviewed_at",
"link_notes",
]
def to_float(value):
try:
return float(value)
except (TypeError, ValueError):
return None
def normalized_quantity(row):
size_value = to_float(row.get("representative_size_value"))
pack_qty = to_float(row.get("representative_pack_qty")) or 1.0
size_unit = row.get("representative_size_unit", "")
measure_type = row.get("representative_measure_type", "")
if size_value is not None and size_unit:
return format(size_value * pack_qty, "g"), size_unit
if row.get("representative_pack_qty") and measure_type == "count":
return row["representative_pack_qty"], "count"
if measure_type == "each":
return "1", "each"
return "", ""
def auto_link_rule(observed_row):
if observed_row.get("is_fee") == "true":
return "", "", ""
if observed_row.get("representative_upc"):
return (
"exact_upc",
f"upc={observed_row['representative_upc']}",
"high",
)
if (
observed_row.get("representative_name_norm")
and observed_row.get("representative_size_value")
and observed_row.get("representative_size_unit")
):
return (
"exact_name_size",
"|".join(
[
f"name={observed_row['representative_name_norm']}",
f"size={observed_row['representative_size_value']}",
f"unit={observed_row['representative_size_unit']}",
f"pack={observed_row['representative_pack_qty']}",
f"measure={observed_row['representative_measure_type']}",
]
),
"high",
)
if (
observed_row.get("representative_name_norm")
and not observed_row.get("representative_size_value")
and not observed_row.get("representative_size_unit")
and not observed_row.get("representative_pack_qty")
):
return (
"exact_name",
"|".join(
[
f"name={observed_row['representative_name_norm']}",
f"measure={observed_row['representative_measure_type']}",
]
),
"medium",
)
return "", "", ""
def canonical_row_for_group(canonical_product_id, group_rows, link_method):
quantity_value, quantity_unit = normalized_quantity(
{
"representative_size_value": representative_value(
group_rows, "representative_size_value"
),
"representative_size_unit": representative_value(
group_rows, "representative_size_unit"
),
"representative_pack_qty": representative_value(
group_rows, "representative_pack_qty"
),
"representative_measure_type": representative_value(
group_rows, "representative_measure_type"
),
}
)
return {
"canonical_product_id": canonical_product_id,
"canonical_name": representative_value(group_rows, "representative_name_norm"),
"product_type": "",
"brand": representative_value(group_rows, "representative_brand"),
"variant": representative_value(group_rows, "representative_variant"),
"size_value": representative_value(group_rows, "representative_size_value"),
"size_unit": representative_value(group_rows, "representative_size_unit"),
"pack_qty": representative_value(group_rows, "representative_pack_qty"),
"measure_type": representative_value(group_rows, "representative_measure_type"),
"normalized_quantity": quantity_value,
"normalized_quantity_unit": quantity_unit,
"notes": f"auto-linked via {link_method}",
"created_at": "",
"updated_at": "",
}
def build_canonical_layer(observed_rows):
canonical_rows = []
link_rows = []
groups = {}
for observed_row in sorted(observed_rows, key=lambda row: row["observed_product_id"]):
link_method, group_key, confidence = auto_link_rule(observed_row)
if not group_key:
continue
canonical_product_id = stable_id("gcan", f"{link_method}|{group_key}")
groups.setdefault(canonical_product_id, {"method": link_method, "rows": []})
groups[canonical_product_id]["rows"].append(observed_row)
link_rows.append(
{
"observed_product_id": observed_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"link_method": link_method,
"link_confidence": confidence,
"review_status": "",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
}
)
for canonical_product_id, group in sorted(groups.items()):
canonical_rows.append(
canonical_row_for_group(
canonical_product_id, group["rows"], group["method"]
)
)
return canonical_rows, link_rows
@click.command()
@click.option(
"--observed-csv",
default="giant_output/products_observed.csv",
show_default=True,
help="Path to observed product rows.",
)
@click.option(
"--canonical-csv",
default="giant_output/products_canonical.csv",
show_default=True,
help="Path to canonical product output.",
)
@click.option(
"--links-csv",
default="giant_output/product_links.csv",
show_default=True,
help="Path to observed-to-canonical link output.",
)
def main(observed_csv, canonical_csv, links_csv):
observed_rows = read_csv_rows(observed_csv)
canonical_rows, link_rows = build_canonical_layer(observed_rows)
write_csv_rows(canonical_csv, canonical_rows, CANONICAL_FIELDS)
write_csv_rows(links_csv, link_rows, LINK_FIELDS)
click.echo(
f"wrote {len(canonical_rows)} canonical rows to {canonical_csv} and "
f"{len(link_rows)} links to {links_csv}"
)
if __name__ == "__main__":
main()