Compare commits

8 Commits

Author SHA1 Message Date
ben
eddef7de2b updated readme and prep for next phase 2026-03-17 13:59:57 -04:00
ben
83bc6c4a7c Update t1.12 task evidence 2026-03-17 13:25:21 -04:00
ben
d39497c298 Refine product review prompt flow 2026-03-17 13:25:12 -04:00
ben
7b8141cd42 Improve product review display workflow 2026-03-17 12:25:47 -04:00
ben
e494386e64 build_purchases rev1 2026-03-17 12:21:44 -04:00
ben
7527fe37eb added git notes 2026-03-17 12:21:24 -04:00
ben
a1fafa3885 added t1.12 scope to simplify review process 2026-03-17 12:20:48 -04:00
ben
37b2196023 added git notes 2026-03-17 09:23:00 -04:00
7 changed files with 681 additions and 112 deletions

View File

@@ -1,17 +1,17 @@
# scrape-giant # scrape-giant
Small CLI pipeline for pulling purchase history from Giant and Costco, enriching line items, and building a reviewable cross-retailer purchase dataset. CLI to pull purchase history from Giant and Costco websites and refine into a single product catalog for external analysis.
There is no one-shot runner yet. Today, you run the scripts step by step from the terminal. Run each script step-by-step from the terminal.
## What It Does ## What It Does
- `scrape_giant.py`: download Giant orders and items 1. `scrape_giant.py`: download Giant orders and items
- `enrich_giant.py`: normalize Giant line items 2. `enrich_giant.py`: normalize Giant line items
- `scrape_costco.py`: download Costco orders and items 3. `scrape_costco.py`: download Costco orders and items
- `enrich_costco.py`: normalize Costco line items 4. `enrich_costco.py`: normalize Costco line items
- `build_purchases.py`: combine retailer outputs into one purchase table 5. `build_purchases.py`: combine retailer outputs into one purchase table
- `review_products.py`: review unresolved product matches in the terminal 6. `review_products.py`: review unresolved product matches in the terminal
## Requirements ## Requirements
@@ -36,7 +36,6 @@ Current version works best with `.env` in the project root. The scraper will pr
GIANT_USER_ID=... GIANT_USER_ID=...
GIANT_LOYALTY_NUMBER=... GIANT_LOYALTY_NUMBER=...
# Costco can use these if present, but it can also pull session values from Firefox.
COSTCO_X_AUTHORIZATION=... COSTCO_X_AUTHORIZATION=...
COSTCO_X_WCS_CLIENTID=... COSTCO_X_WCS_CLIENTID=...
COSTCO_CLIENT_IDENTIFIER=... COSTCO_CLIENT_IDENTIFIER=...
@@ -89,18 +88,14 @@ Combined:
## Review Workflow ## Review Workflow
`review_products.py` is the manual cleanup step for unresolved or weakly unified items. Run `review_products.py` to cleanup unresolved or weakly unified items:
In the terminal, you can:
- link an item to an existing canonical product - link an item to an existing canonical product
- create a new canonical product - create a new canonical product
- exclude an item - exclude an item
- skip it for later - skip it for later
Decisions are saved and reused on later runs.
Those decisions are saved and reused on later runs.
## Notes ## Notes
- This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction. - This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction.
- `scrape_giant.py` and `scrape_costco.py` are meant to work as standalone acquisition scripts. - `scrape_giant.py` and `scrape_costco.py` are meant to work as standalone acquisition scripts.
- `validate_cross_retailer_flow.py` is a proof/check script, not a required production step. - `validate_cross_retailer_flow.py` is a proof/check script, not a required production step.

View File

@@ -7,11 +7,7 @@ import build_canonical_layer
import build_observed_products import build_observed_products
import validate_cross_retailer_flow import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal from enrich_giant import format_decimal, to_decimal
<<<<<<< HEAD
from layer_helpers import read_csv_rows, stable_id, write_csv_rows from layer_helpers import read_csv_rows, stable_id, write_csv_rows
=======
from layer_helpers import read_csv_rows, write_csv_rows
>>>>>>> be1bf63 (Build pivot-ready purchase log)
PURCHASE_FIELDS = [ PURCHASE_FIELDS = [
@@ -22,13 +18,11 @@ PURCHASE_FIELDS = [
"observed_item_key", "observed_item_key",
"observed_product_id", "observed_product_id",
"canonical_product_id", "canonical_product_id",
<<<<<<< HEAD
"review_status", "review_status",
"resolution_action", "resolution_action",
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"image_url",
"retailer_item_id", "retailer_item_id",
"upc", "upc",
"qty", "qty",
@@ -69,7 +63,6 @@ EXAMPLE_FIELDS = [
"notes", "notes",
] ]
<<<<<<< HEAD
CATALOG_FIELDS = [ CATALOG_FIELDS = [
"canonical_product_id", "canonical_product_id",
"canonical_name", "canonical_name",
@@ -95,8 +88,6 @@ RESOLUTION_FIELDS = [
"reviewed_at", "reviewed_at",
] ]
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
def decimal_or_zero(value): def decimal_or_zero(value):
return to_decimal(value) or Decimal("0") return to_decimal(value) or Decimal("0")
@@ -175,7 +166,6 @@ def order_lookup(rows, retailer):
} }
<<<<<<< HEAD
def read_optional_csv_rows(path): def read_optional_csv_rows(path):
path = Path(path) path = Path(path)
if not path.exists(): if not path.exists():
@@ -220,9 +210,6 @@ def catalog_row_from_canonical(row):
def build_link_state(enriched_rows): def build_link_state(enriched_rows):
=======
def build_link_lookup(enriched_rows):
>>>>>>> be1bf63 (Build pivot-ready purchase log)
observed_rows = build_observed_products.build_observed_products(enriched_rows) observed_rows = build_observed_products.build_observed_products(enriched_rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows) canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows) giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
@@ -239,7 +226,6 @@ def build_link_lookup(enriched_rows):
canonical_id_by_observed = { canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
} }
<<<<<<< HEAD
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
@@ -268,14 +254,6 @@ def build_purchase_rows(
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"] canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"]
elif action == "exclude": elif action == "exclude":
canonical_id_by_observed[observed_product_id] = "" canonical_id_by_observed[observed_product_id] = ""
=======
return observed_id_by_key, canonical_id_by_observed
def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
observed_id_by_key, canonical_id_by_observed = build_link_lookup(all_enriched_rows)
>>>>>>> be1bf63 (Build pivot-ready purchase log)
orders_by_id = {} orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant")) orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco")) orders_by_id.update(order_lookup(costco_orders, "costco"))
@@ -289,10 +267,7 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
observed_product_id = observed_id_by_key.get(observed_key, "") observed_product_id = observed_id_by_key.get(observed_key, "")
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {}) order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row) metrics = derive_metrics(row)
<<<<<<< HEAD
resolution = resolution_lookup.get(observed_product_id, {}) resolution = resolution_lookup.get(observed_product_id, {})
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
purchase_rows.append( purchase_rows.append(
{ {
"purchase_date": row["order_date"], "purchase_date": row["order_date"],
@@ -302,13 +277,11 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
"observed_item_key": row["observed_item_key"], "observed_item_key": row["observed_item_key"],
"observed_product_id": observed_product_id, "observed_product_id": observed_product_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""), "canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
<<<<<<< HEAD
"review_status": resolution.get("status", ""), "review_status": resolution.get("status", ""),
"resolution_action": resolution.get("resolution_action", ""), "resolution_action": resolution.get("resolution_action", ""),
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
"raw_item_name": row["item_name"], "raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"], "normalized_item_name": row["item_name_norm"],
"image_url": row.get("image_url", ""),
"retailer_item_id": row["retailer_item_id"], "retailer_item_id": row["retailer_item_id"],
"upc": row["upc"], "upc": row["upc"],
"qty": row["qty"], "qty": row["qty"],
@@ -330,7 +303,6 @@ def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders,
**metrics, **metrics,
} }
) )
<<<<<<< HEAD
return purchase_rows, observed_rows, canonical_rows, link_rows return purchase_rows, observed_rows, canonical_rows, link_rows
@@ -358,9 +330,6 @@ def apply_manual_resolutions_to_links(link_rows, resolution_rows):
"link_notes": resolution.get("resolution_notes", ""), "link_notes": resolution.get("resolution_notes", ""),
} }
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"]) return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
=======
return purchase_rows
>>>>>>> be1bf63 (Build pivot-ready purchase log)
def build_comparison_examples(purchase_rows): def build_comparison_examples(purchase_rows):
@@ -399,12 +368,9 @@ def build_comparison_examples(purchase_rows):
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True) @click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True) @click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True) @click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
<<<<<<< HEAD
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True) @click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--links-csv", default="combined_output/product_links.csv", show_default=True) @click.option("--links-csv", default="combined_output/product_links.csv", show_default=True)
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True) @click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
def main( def main(
@@ -412,7 +378,6 @@ def main(
costco_items_enriched_csv, costco_items_enriched_csv,
giant_orders_csv, giant_orders_csv,
costco_orders_csv, costco_orders_csv,
<<<<<<< HEAD
resolutions_csv, resolutions_csv,
catalog_csv, catalog_csv,
links_csv, links_csv,
@@ -421,17 +386,10 @@ def main(
): ):
resolution_rows = read_optional_csv_rows(resolutions_csv) resolution_rows = read_optional_csv_rows(resolutions_csv)
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows( purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows(
=======
output_csv,
examples_csv,
):
purchase_rows = build_purchase_rows(
>>>>>>> be1bf63 (Build pivot-ready purchase log)
read_csv_rows(giant_items_enriched_csv), read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv), read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv), read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv), read_csv_rows(costco_orders_csv),
<<<<<<< HEAD
resolution_rows, resolution_rows,
) )
existing_catalog_rows = read_optional_csv_rows(catalog_csv) existing_catalog_rows = read_optional_csv_rows(catalog_csv)
@@ -448,14 +406,6 @@ def main(
click.echo( click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, " f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, " f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, "
=======
)
example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv} "
>>>>>>> be1bf63 (Build pivot-ready purchase log)
f"and {len(example_rows)} comparison examples to {examples_csv}" f"and {len(example_rows)} comparison examples to {examples_csv}"
) )

View File

@@ -27,6 +27,8 @@ carry forward image url
3. build observed-product atble from enriched items 3. build observed-product atble from enriched items
* git issues * git issues
** ssh / access to gitea
ssh://git@192.168.1.207:2020/ben/scrape-giant.git ssh://git@192.168.1.207:2020/ben/scrape-giant.git
https://git.hgsky.me/ben/scrape-giant.git https://git.hgsky.me/ben/scrape-giant.git
@@ -44,6 +46,31 @@ git remote set-url gitea git@gitea:ben/scrape-giant.git
on local network: use ssh to 192.168.1.207:2020 on local network: use ssh to 192.168.1.207:2020
from elsewhere/public: use https to git.hgsky.me/... unless you later expose ssh properly from elsewhere/public: use https to git.hgsky.me/... unless you later expose ssh properly
** stash
z z to stash local work only
take care not to add ignored files which will add the venv and `__pycache__`
z p to pop the stash back
** creating remote branches
P p, magit will suggest upstream (gitea), select and Enter and it will be created
** cherry-picking
b b : switch to desired branch (review)
l B : open reflog for local branches
(my changes were committed to local cx but not pushed to gitea/cx)
put point on the commit you want; did this in sequence
A A : cherry pick commit to current branch
minibuffer will show the commit and all branches, leave it on that commit
the final commit was not shown by hash, just the branch cx
since (local) cx was caught up with that branch
** reverting a branch
b l : switch to local branch (cx)
l l : open local reflog
put point on the commit; highlighted remote gitea/cx
X : reset branch; prompts you, selected cx
* giant requests * giant requests
** item: ** item:
get: get:
@@ -223,3 +250,18 @@ python build_observed_products.py
python build_review_queue.py python build_review_queue.py
python build_canonical_layer.py python build_canonical_layer.py
python validate_cross_retailer_flow.py python validate_cross_retailer_flow.py
* t1.11 tasks [2026-03-17 Tue 13:49]
ok i ran a few. time to run some cleanups here - i'm wondering if we shouldn't be less aggressive with canonical names and encourage a better manual process to start.
1. auto-created canonical_names lack category, product_type - ok with filling these in manually in the catalog once the queue is empty
2. canonical_names feel too specific, e.g., "5DZ egg"
3. some canonical_names need consolidation, eg "LIME" and "LIME . / ." ; poss cleanup issue. there are 5 entries for ergg but but they are all regular large grade A white eggs, just different amounts in dozens.
Eggs are actually a great candidate for the kind of analysis we want to do - the pipeline should have caught and properly sorted these into size/qty:
```canonical_product_id canonical_name category product_type brand variant size_value size_unit pack_qty measure_type notes created_at updated_at
gcan_0e350505fd22 5DZ EGG / / KS each auto-linked via exact_name
gcan_47279a80f5f3 EGG 5 DOZ. BBS each auto-linked via exact_name
gcan_7d099130c1bf LRG WHITE EGG SB 30 count auto-linked via exact_upc
gcan_849c2817e667 GDA LRG WHITE EGG SB 18 count auto-linked via exact_upc
gcan_cb0c6c8cf480 LG EGG CONVENTIONAL 18 count count auto-linked via exact_name_size ```
4. Build costco mechanism for matching discount to line item.
1. Discounts appear as their own line items with a number like /123456, this matches the UPC of the discounted item
2. must be date-matched to the UPC

View File

@@ -367,6 +367,55 @@
- commit: `c7dad54` on branch `cx` - commit: `c7dad54` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; verified `combined_output/review_queue.csv`, `combined_output/review_resolutions.csv` workflow, and `combined_output/canonical_catalog.csv` - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; verified `combined_output/review_queue.csv`, `combined_output/review_resolutions.csv` workflow, and `combined_output/canonical_catalog.csv`
- date: 2026-03-16 - date: 2026-03-16
* [X] t1.12: simplify review process display
Clearly show current state separate from proposed future state.
** acceptance criteria
1. Display position in review queue, e.g., (1/22)
2. Display compact header with observed_product under review, queue position, and canonical decision, e.g.: "Resolve [n] observed product group [name] and associated items to canonical_name [name]? (\n [n] matched items)"
3. color-code outputs based on info, input/prompt, warning/error
1. color action menu/requests for input differently from display text; do not color individual options separately
2. "no canonical_name suggestions found" is informational, not a warning/error.
4. update action menu `[x]exclude` to `e[x]clude`
5. on each review item, display a list of all matched items to be linked, sorted by descending date:
1. YYYY-mm-dd, price, raw item name, normalized item name, upc, retailer
2. image URL, if exists
3. Sample:
6. on each review item, suggest (but do not auto-apply) up to 3 likely existing canonicals using determinstic rules, e.g:
1. exact normalized name match
2. prefix/contains match on canonical name
3. exact UPC
7. Sample Entry:
#+begin_comment
Review 7/22: Resolve observed_product MIXED PEPPER to canonical_name [__]?
2 matched items:
[1] 2026-03-12 | 7.49 | MIXED PEPPER 6-PACK | MIXED PEPPER | [upc] | costco | [img_url]
[2] [YYYY-mm-dd] | [price] | [raw_name] | [observed_name] | [upc] | [retailer] | [img_url]
2 canonical suggestions found:
[1] BELL PEPPERS, PRODUCE
[2] PEPPER, SPICES
#+end_comment
8. When link is selected, users should be able to select the number of the item in the list, e.g.:
#+begin_comment
Select the canonical_name to associate [n] items with:
[1] GRB GRADU PCH PUF1. | gcan_01b0d623aa02
[2] BTB CHICKEN | gcan_0201f0feb749
[3] LIME | gcan_02074d9e7359
#+end_comment
9. Add confirmation to link selection with instructions, "[n] [observed_name] and future observed_name matches will be associated with [canonical_name], is this ok?
actions: [Y]es [n]o [b]ack [s]kip [q]uit
- reinforce project terminology such as raw_name, observed_name, canonical_name
** evidence
- commit: `7b8141c`, `d39497c`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python -m unittest tests.test_review_workflow tests.test_purchases`; `./venv/bin/python review_products.py --help`; verified compact review header, numbered matched-item display, informational no-suggestion state, numbered canonical selection, and confirmation flow
- date: 2026-03-17
** notes
- The key improvement was shifting the prompt from system metadata to reviewer intent: one observed_product, its matched retailer rows, and one canonical_name decision.
- Numbered canonical selection plus confirmation worked better than free-text id entry and should reduce accidental links.
- Deterministic suggestions remain intentionally conservative; they speed up common cases, but unresolved items still depend on human review by design.
* [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved products (2-4 commits) * [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved products (2-4 commits)
** acceptance criteria ** acceptance criteria

View File

@@ -1,6 +1,5 @@
from collections import defaultdict from collections import defaultdict
from datetime import date from datetime import date
from pathlib import Path
import click import click
@@ -99,17 +98,175 @@ def save_catalog_rows(path, rows):
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS) write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
def prompt_resolution(queue_row, catalog_rows): INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
def sort_related_items(rows):
return sorted(
rows,
key=lambda row: (
row.get("purchase_date", ""),
row.get("order_id", ""),
int(row.get("line_no", "0") or "0"),
),
reverse=True,
)
def build_canonical_suggestions(related_rows, catalog_rows, limit=3):
normalized_names = {
row.get("normalized_item_name", "").strip().upper()
for row in related_rows
if row.get("normalized_item_name", "").strip()
}
upcs = {
row.get("upc", "").strip()
for row in related_rows
if row.get("upc", "").strip()
}
suggestions = []
seen_ids = set()
def add_matches(rows, reason):
for row in rows:
canonical_product_id = row.get("canonical_product_id", "")
if not canonical_product_id or canonical_product_id in seen_ids:
continue
seen_ids.add(canonical_product_id)
suggestions.append(
{
"canonical_product_id": canonical_product_id,
"canonical_name": row.get("canonical_name", ""),
"reason": reason,
}
)
if len(suggestions) >= limit:
return True
return False
exact_upc_rows = [
row
for row in catalog_rows
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs
]
if add_matches(exact_upc_rows, "exact upc"):
return suggestions
exact_name_rows = [
row
for row in catalog_rows
if row.get("canonical_name", "").strip().upper() in normalized_names
]
if add_matches(exact_name_rows, "exact normalized name"):
return suggestions
contains_rows = []
for row in catalog_rows:
canonical_name = row.get("canonical_name", "").strip().upper()
if not canonical_name:
continue
for normalized_name in normalized_names:
if normalized_name in canonical_name or canonical_name in normalized_name:
contains_rows.append(row)
break
add_matches(contains_rows, "canonical name contains match")
return suggestions
def build_display_lines(queue_row, related_rows):
lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1):
lines.append(
" [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | "
"{upc} | {retailer}".format(
index=index,
purchase_date=row.get("purchase_date", ""),
line_total=row.get("line_total", ""),
raw_item_name=row.get("raw_item_name", ""),
normalized_item_name=row.get("normalized_item_name", ""),
upc=row.get("upc", ""),
retailer=row.get("retailer", ""),
)
)
if row.get("image_url"):
lines.append(f" {row['image_url']}")
if not lines:
lines.append(" [1] no matched item rows found")
return lines
def observed_name(queue_row, related_rows):
if queue_row.get("normalized_names"):
return queue_row["normalized_names"].split(" | ")[0]
for row in related_rows:
if row.get("normalized_item_name"):
return row["normalized_item_name"]
return queue_row.get("observed_product_id", "")
def choose_existing_canonical(display_rows, observed_label, matched_count):
click.secho(
f"Select the canonical_name to associate {matched_count} items with:",
fg=INFO_COLOR,
)
for index, row in enumerate(display_rows, start=1):
click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}")
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)),
)
chosen_row = display_rows[choice - 1]
click.echo(
f'{matched_count} "{observed_label}" items and future matches will be associated '
f'with "{chosen_row["canonical_name"]}".'
)
click.secho(
"actions: [y]es [n]o [b]ack [s]kip [q]uit",
fg=PROMPT_COLOR,
)
confirm = click.prompt(
click.style("confirm", fg=PROMPT_COLOR),
type=click.Choice(["y", "n", "b", "s", "q"]),
)
if confirm == "y":
return chosen_row["canonical_product_id"], ""
if confirm == "s":
return "", "skip"
if confirm == "q":
return "", "quit"
return "", "back"
def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total):
suggestions = build_canonical_suggestions(related_rows, catalog_rows)
observed_label = observed_name(queue_row, related_rows)
matched_count = len(related_rows)
click.echo("") click.echo("")
click.echo(f"observed_product_id: {queue_row['observed_product_id']}") click.secho(
click.echo(f"retailer: {queue_row['retailer']}") f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} "
click.echo(f"raw names: {queue_row['raw_item_names']}") "to canonical_name [__]?",
click.echo(f"normalized names: {queue_row['normalized_names']}") fg=INFO_COLOR,
click.echo(f"upcs: {queue_row['upc_values']}") )
click.echo(f"example prices: {queue_row['example_prices']}") click.echo(f"{matched_count} matched items:")
click.echo(f"seen count: {queue_row['seen_count']}") for line in build_display_lines(queue_row, related_rows):
click.echo("actions: [l]ink existing [n]ew canonical [x]exclude [s]kip [q]uit") click.echo(line)
action = click.prompt("action", type=click.Choice(["l", "n", "x", "s", "q"])) if suggestions:
click.echo(f"{len(suggestions)} canonical suggestions found:")
for index, suggestion in enumerate(suggestions, start=1):
click.echo(f" [{index}] {suggestion['canonical_name']}")
else:
click.echo("no canonical_name suggestions found")
click.secho(
"[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:",
fg=PROMPT_COLOR,
)
action = click.prompt(
"",
type=click.Choice(["l", "n", "x", "s", "q"]),
prompt_suffix=" ",
)
if action == "q": if action == "q":
return None, None return None, None
if action == "s": if action == "s":
@@ -122,7 +279,11 @@ def prompt_resolution(queue_row, catalog_rows):
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
}, None }, None
if action == "x": if action == "x":
notes = click.prompt("exclude notes", default="", show_default=False) notes = click.prompt(
click.style("exclude notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
return { return {
"observed_product_id": queue_row["observed_product_id"], "observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "", "canonical_product_id": "",
@@ -132,11 +293,35 @@ def prompt_resolution(queue_row, catalog_rows):
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
}, None }, None
if action == "l": if action == "l":
click.echo("existing canonicals:") display_rows = suggestions or [
for row in catalog_rows[:10]: {
click.echo(f" {row['canonical_product_id']} {row['canonical_name']}") "canonical_product_id": row["canonical_product_id"],
canonical_product_id = click.prompt("canonical product id", type=str) "canonical_name": row["canonical_name"],
notes = click.prompt("link notes", default="", show_default=False) "reason": "catalog sample",
}
for row in catalog_rows[:10]
]
while True:
canonical_product_id, outcome = choose_existing_canonical(
display_rows,
observed_label,
matched_count,
)
if outcome == "skip":
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if outcome == "quit":
return None, None
if outcome == "back":
continue
break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return { return {
"observed_product_id": queue_row["observed_product_id"], "observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": canonical_product_id, "canonical_product_id": canonical_product_id,
@@ -146,10 +331,22 @@ def prompt_resolution(queue_row, catalog_rows):
"reviewed_at": str(date.today()), "reviewed_at": str(date.today()),
}, None }, None
canonical_name = click.prompt("canonical name", type=str) canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str)
category = click.prompt("category", default="", show_default=False) category = click.prompt(
product_type = click.prompt("product type", default="", show_default=False) click.style("category", fg=PROMPT_COLOR),
notes = click.prompt("notes", default="", show_default=False) default="",
show_default=False,
)
product_type = click.prompt(
click.style("product type", fg=PROMPT_COLOR),
default="",
show_default=False,
)
notes = click.prompt(
click.style("notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}") canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}")
canonical_row = { canonical_row = {
"canonical_product_id": canonical_product_id, "canonical_product_id": canonical_product_id,
@@ -197,11 +394,17 @@ def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows) resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")} catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")}
rows_by_observed = defaultdict(list)
for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "")
if observed_product_id:
rows_by_observed[observed_product_id].append(row)
reviewed = 0 reviewed = 0
for queue_row in queue_rows: for index, queue_row in enumerate(queue_rows, start=1):
if limit and reviewed >= limit: if limit and reviewed >= limit:
break break
result = prompt_resolution(queue_row, catalog_rows) related_rows = rows_by_observed.get(queue_row["observed_product_id"], [])
result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows))
if result == (None, None): if result == (None, None):
break break
resolution_row, canonical_row = result resolution_row, canonical_row = result

View File

@@ -41,6 +41,7 @@ class PurchaseLogTests(unittest.TestCase):
"order_date": "2026-03-01", "order_date": "2026-03-01",
"item_name": "FRESH BANANA", "item_name": "FRESH BANANA",
"item_name_norm": "BANANA", "item_name_norm": "BANANA",
"image_url": "https://example.test/banana.jpg",
"retailer_item_id": "100", "retailer_item_id": "100",
"upc": "4011", "upc": "4011",
"qty": "1", "qty": "1",
@@ -99,24 +100,18 @@ class PurchaseLogTests(unittest.TestCase):
} }
] ]
<<<<<<< HEAD
rows, _observed, _canon, _links = build_purchases.build_purchase_rows( rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
=======
rows = build_purchases.build_purchase_rows(
>>>>>>> be1bf63 (Build pivot-ready purchase log)
[giant_row], [giant_row],
[costco_row], [costco_row],
giant_orders, giant_orders,
costco_orders, costco_orders,
<<<<<<< HEAD
[], [],
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
) )
self.assertEqual(2, len(rows)) self.assertEqual(2, len(rows))
self.assertTrue(all(row["canonical_product_id"] for row in rows)) self.assertTrue(all(row["canonical_product_id"] for row in rows))
self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows}) self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows})
self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"])
def test_main_writes_purchase_and_example_csvs(self): def test_main_writes_purchase_and_example_csvs(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
@@ -124,11 +119,13 @@ class PurchaseLogTests(unittest.TestCase):
costco_items = Path(tmpdir) / "costco_items.csv" costco_items = Path(tmpdir) / "costco_items.csv"
giant_orders = Path(tmpdir) / "giant_orders.csv" giant_orders = Path(tmpdir) / "giant_orders.csv"
costco_orders = Path(tmpdir) / "costco_orders.csv" costco_orders = Path(tmpdir) / "costco_orders.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
purchases_csv = Path(tmpdir) / "combined" / "purchases.csv" purchases_csv = Path(tmpdir) / "combined" / "purchases.csv"
examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv" examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv"
fieldnames = enrich_costco.OUTPUT_FIELDS fieldnames = enrich_costco.OUTPUT_FIELDS
rows = []
giant_row = {field: "" for field in fieldnames} giant_row = {field: "" for field in fieldnames}
giant_row.update( giant_row.update(
{ {
@@ -178,7 +175,6 @@ class PurchaseLogTests(unittest.TestCase):
"is_fee": "false", "is_fee": "false",
} }
) )
rows.extend([giant_row, costco_row])
for path, source_rows in [ for path, source_rows in [
(giant_items, [giant_row]), (giant_items, [giant_row]),
@@ -189,12 +185,35 @@ class PurchaseLogTests(unittest.TestCase):
writer.writeheader() writer.writeheader()
writer.writerows(source_rows) writer.writerows(source_rows)
order_fields = ["order_id", "store_name", "store_number", "store_city", "store_state"]
for path, source_rows in [ for path, source_rows in [
(giant_orders, [{"order_id": "g1", "store_name": "Giant", "store_number": "42", "store_city": "Springfield", "store_state": "VA"}]), (
(costco_orders, [{"order_id": "c1", "store_name": "MT VERNON", "store_number": "1115", "store_city": "ALEXANDRIA", "store_state": "VA"}]), giant_orders,
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
),
(
costco_orders,
[
{
"order_id": "c1",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
}
],
),
]: ]:
with path.open("w", newline="", encoding="utf-8") as handle: with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=["order_id", "store_name", "store_number", "store_city", "store_state"]) writer = csv.DictWriter(handle, fieldnames=order_fields)
writer.writeheader() writer.writeheader()
writer.writerows(source_rows) writer.writerows(source_rows)
@@ -203,12 +222,9 @@ class PurchaseLogTests(unittest.TestCase):
costco_items_enriched_csv=str(costco_items), costco_items_enriched_csv=str(costco_items),
giant_orders_csv=str(giant_orders), giant_orders_csv=str(giant_orders),
costco_orders_csv=str(costco_orders), costco_orders_csv=str(costco_orders),
<<<<<<< HEAD resolutions_csv=str(resolutions_csv),
resolutions_csv=str(Path(tmpdir) / "review_resolutions.csv"), catalog_csv=str(catalog_csv),
catalog_csv=str(Path(tmpdir) / "canonical_catalog.csv"), links_csv=str(links_csv),
links_csv=str(Path(tmpdir) / "product_links.csv"),
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
output_csv=str(purchases_csv), output_csv=str(purchases_csv),
examples_csv=str(examples_csv), examples_csv=str(examples_csv),
) )
@@ -222,7 +238,6 @@ class PurchaseLogTests(unittest.TestCase):
self.assertEqual(2, len(purchase_rows)) self.assertEqual(2, len(purchase_rows))
self.assertEqual(1, len(example_rows)) self.assertEqual(1, len(example_rows))
<<<<<<< HEAD
def test_build_purchase_rows_applies_manual_resolution(self): def test_build_purchase_rows_applies_manual_resolution(self):
fieldnames = enrich_costco.OUTPUT_FIELDS fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames} giant_row = {field: "" for field in fieldnames}
@@ -255,7 +270,15 @@ class PurchaseLogTests(unittest.TestCase):
rows, _observed, _canon, _links = build_purchases.build_purchase_rows( rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row], [giant_row],
[], [],
[{"order_id": "g1", "store_name": "Giant", "store_number": "42", "store_city": "Springfield", "store_state": "VA"}], [
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
[], [],
[ [
{ {
@@ -273,8 +296,6 @@ class PurchaseLogTests(unittest.TestCase):
self.assertEqual("approved", rows[0]["review_status"]) self.assertEqual("approved", rows[0]["review_status"])
self.assertEqual("create", rows[0]["resolution_action"]) self.assertEqual("create", rows[0]["resolution_action"])
=======
>>>>>>> be1bf63 (Build pivot-ready purchase log)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -4,6 +4,8 @@ import unittest
from pathlib import Path from pathlib import Path
from unittest import mock from unittest import mock
from click.testing import CliRunner
import review_products import review_products
@@ -37,6 +39,305 @@ class ReviewWorkflowTests(unittest.TestCase):
self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"]) self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"])
self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"]) self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"])
def test_build_canonical_suggestions_prefers_upc_then_name(self):
suggestions = review_products.build_canonical_suggestions(
[
{
"normalized_item_name": "MIXED PEPPER",
"upc": "12345",
}
],
[
{
"canonical_product_id": "gcan_1",
"canonical_name": "MIXED PEPPER",
"upc": "",
},
{
"canonical_product_id": "gcan_2",
"canonical_name": "MIXED PEPPER 6 PACK",
"upc": "12345",
},
],
)
self.assertEqual("gcan_2", suggestions[0]["canonical_product_id"])
self.assertEqual("exact upc", suggestions[0]["reason"])
self.assertEqual("gcan_1", suggestions[1]["canonical_product_id"])
def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
purchase_fields = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
]
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=purchase_fields)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "https://example.test/mixed-pepper.jpg",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "produce",
"product_type": "pepper",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
runner = CliRunner()
result = runner.invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Review 1/1: Resolve observed_product MIXED PEPPER to canonical_name [__]?", result.output)
self.assertIn("2 matched items:", result.output)
self.assertIn("[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", result.output)
first_item = result.output.index("[1] 2026-03-14 | 7.49")
second_item = result.output.index("[2] 2026-03-12 | 6.99")
self.assertLess(first_item, second_item)
self.assertIn("https://example.test/mixed-pepper.jpg", result.output)
self.assertIn("1 canonical suggestions found:", result.output)
self.assertIn("[1] MIXED PEPPER", result.output)
self.assertIn("\x1b[", result.output)
def test_review_products_no_suggestions_is_informational(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_product_id": "gobs_ice",
"canonical_product_id": "",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("no canonical_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "",
"product_type": "",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--limit",
"1",
],
input="l\n1\ny\nlinked by test\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Select the canonical_name to associate 2 items with:", result.output)
self.assertIn('[1] MIXED PEPPER | gcan_mix', result.output)
self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output)
self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual("gcan_mix", rows[0]["canonical_product_id"])
self.assertEqual("link", rows[0]["resolution_action"])
def test_review_products_creates_canonical_and_resolution(self): def test_review_products_creates_canonical_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv" purchases_csv = Path(tmpdir) / "purchases.csv"
@@ -48,25 +349,33 @@ class ReviewWorkflowTests(unittest.TestCase):
writer = csv.DictWriter( writer = csv.DictWriter(
handle, handle,
fieldnames=[ fieldnames=[
"purchase_date",
"observed_product_id", "observed_product_id",
"canonical_product_id", "canonical_product_id",
"retailer", "retailer",
"raw_item_name", "raw_item_name",
"normalized_item_name", "normalized_item_name",
"image_url",
"upc", "upc",
"line_total", "line_total",
"order_id",
"line_no",
], ],
) )
writer.writeheader() writer.writeheader()
writer.writerow( writer.writerow(
{ {
"purchase_date": "2026-03-15",
"observed_product_id": "gobs_ice", "observed_product_id": "gobs_ice",
"canonical_product_id": "", "canonical_product_id": "",
"retailer": "giant", "retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB", "raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE", "normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "", "upc": "",
"line_total": "3.50", "line_total": "3.50",
"order_id": "g1",
"line_no": "1",
} }
) )