Compare commits

23 Commits

Author SHA1 Message Date
ben
eddef7de2b updated readme and prep for next phase 2026-03-17 13:59:57 -04:00
ben
83bc6c4a7c Update t1.12 task evidence 2026-03-17 13:25:21 -04:00
ben
d39497c298 Refine product review prompt flow 2026-03-17 13:25:12 -04:00
ben
7b8141cd42 Improve product review display workflow 2026-03-17 12:25:47 -04:00
ben
e494386e64 build_purchases rev1 2026-03-17 12:21:44 -04:00
ben
7527fe37eb added git notes 2026-03-17 12:21:24 -04:00
ben
a1fafa3885 added t1.12 scope to simplify review process 2026-03-17 12:20:48 -04:00
ben
37b2196023 added git notes 2026-03-17 09:23:00 -04:00
ben
7f8c3ed8eb updated readme with Review steps 2026-03-17 09:14:14 -04:00
ben
91bfd3597e Record t1.11 task evidence 2026-03-16 20:45:57 -04:00
ben
c7dad5489e Add terminal review resolution workflow 2026-03-16 20:45:37 -04:00
ben
34eedff9c5 Record t1.8.7 and t1.9 task evidence 2026-03-16 18:01:16 -04:00
ben
be1bf6328e Build pivot-ready purchase log 2026-03-16 18:01:09 -04:00
ben
6806c0e7ff updated readme 2026-03-16 17:40:23 -04:00
ben
861955557a added instructions 2026-03-16 17:34:22 -04:00
ben
6e1cde2c83 fix json data pull from /raw 2026-03-16 17:34:01 -04:00
ben
23d0c7e5cd fix bug w session.headers.update missing auth_headers 2026-03-16 17:19:07 -04:00
ben
9a985bf98d updated to use .env, then pull idToken and clientID 2026-03-16 17:17:20 -04:00
ben
b0d4044dac updated task 1.8.7 2026-03-16 17:09:13 -04:00
ben
d7a0329332 Simplify browser session bootstrap 2026-03-16 17:08:44 -04:00
e48dd6c4c2 troubleshooting costco header extraction 2026-03-16 16:59:31 -04:00
ben
1b4c7dde25 Simplify Costco browser header extraction 2026-03-16 16:23:38 -04:00
5a331c9af4 fixed sqlite copy permission error 2026-03-16 16:18:50 -04:00
15 changed files with 2167 additions and 351 deletions

158
README.md
View File

@@ -1,103 +1,113 @@
# scrape-giant # scrape-giant
Small grocery-history pipeline for Giant receipts. CLI to pull purchase history from Giant and Costco websites and refine into a single product catalog for external analysis.
The project currently does four things: Run each script step-by-step from the terminal.
1. scrape Giant in-store order history from an active Firefox session ## What It Does
2. enrich raw line items into a deterministic `items_enriched.csv`
3. aggregate retailer-facing observed products and build a manual review queue
4. create a first-pass canonical product layer plus conservative auto-links
The work so far is Giant-specific on the ingest side and intentionally simple on 1. `scrape_giant.py`: download Giant orders and items
the shared product-model side. 2. `enrich_giant.py`: normalize Giant line items
3. `scrape_costco.py`: download Costco orders and items
4. `enrich_costco.py`: normalize Costco line items
5. `build_purchases.py`: combine retailer outputs into one purchase table
6. `review_products.py`: review unresolved product matches in the terminal
## Current flow ## Requirements
Run the commands from the repo root with the project venv active, or call them - Python 3.10+
directly through `./venv/bin/python`. - Firefox installed with active Giant and Costco sessions
## Install
```bash ```bash
./venv/bin/python scraper.py python -m venv venv
./venv/bin/python enrich_giant.py ./venv/scripts/activate
./venv/bin/python build_observed_products.py pip install -r requirements.txt
./venv/bin/python build_review_queue.py
./venv/bin/python build_canonical_layer.py
``` ```
## Inputs ## Optional `.env`
- Firefox cookies for `giantfood.com` Current version works best with `.env` in the project root. The scraper will prompt for these values if they are not found in the current browser session.
- `GIANT_USER_ID` and `GIANT_LOYALTY_NUMBER` in `.env`, shell env, or prompts - `scrape_giant` prompts if `GIANT_USER_ID` or `GIANT_LOYALTY_NUMBER` is missing.
- Giant raw order payloads in `giant_output/raw/` - `scrape_costco` tries `.env` first, then Firefox local storage for session-backed values; `COSTCO_CLIENT_IDENTIFIER` should still be set explicitly.
## Outputs ```env
GIANT_USER_ID=...
GIANT_LOYALTY_NUMBER=...
Current generated files live under `giant_output/`: COSTCO_X_AUTHORIZATION=...
COSTCO_X_WCS_CLIENTID=...
COSTCO_CLIENT_IDENTIFIER=...
```
- `orders.csv`: flattened visit/order rows from the Giant history API ## Run Order
- `items.csv`: flattened raw line items from fetched order detail payloads
- `items_enriched.csv`: deterministic parsed/enriched line items
- `products_observed.csv`: retailer-facing observed product groups
- `review_queue.csv`: products needing manual review
- `products_canonical.csv`: shared canonical product rows
- `product_links.csv`: observed-to-canonical links
Raw json remains the source of truth: Run the pipeline in this order:
- `giant_output/raw/history.json` ```bash
- `giant_output/raw/<order_id>.json` python scrape_giant.py
python enrich_giant.py
python scrape_costco.py
python enrich_costco.py
python build_purchases.py
python review_products.py
python build_purchases.py
```
## Scripts Why run `build_purchases.py` twice:
- first pass builds the current combined dataset and review queue inputs
- `review_products.py` writes durable review decisions
- second pass reapplies those decisions into the purchase output
- `scraper.py`: fetches Giant history/detail payloads and updates `orders.csv` and `items.csv` If you only want to refresh the queue without reviewing interactively:
- `enrich_giant.py`: reads raw Giant order json and writes `items_enriched.csv`
- `build_observed_products.py`: groups enriched rows into `products_observed.csv`
- `build_review_queue.py`: generates `review_queue.csv` and preserves review status on reruns
- `build_canonical_layer.py`: builds `products_canonical.csv` and `product_links.csv`
## Notes on the current model ```bash
python review_products.py --refresh-only
```
- Observed products are retailer-specific: Giant, Costco. ## Key Outputs
- Canonical products are the first cross-retailer layer.
- Auto-linking is conservative:
exact UPC first, then exact normalized name plus exact size/unit context, then
exact normalized name when there is no size context to conflict.
- Fee rows are excluded from auto-linking.
- Unknown values are left blank instead of guessed.
## Verification Giant:
- `giant_output/orders.csv`
- `giant_output/items.csv`
- `giant_output/items_enriched.csv`
Run the test suite with: Costco:
- `costco_output/orders.csv`
- `costco_output/items.csv`
- `costco_output/items_enriched.csv`
Combined:
- `combined_output/purchases.csv`
- `combined_output/review_queue.csv`
- `combined_output/review_resolutions.csv`
- `combined_output/canonical_catalog.csv`
- `combined_output/product_links.csv`
- `combined_output/comparison_examples.csv`
## Review Workflow
Run `review_products.py` to cleanup unresolved or weakly unified items:
- link an item to an existing canonical product
- create a new canonical product
- exclude an item
- skip it for later
Decisions are saved and reused on later runs.
## Notes
- This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction.
- `scrape_giant.py` and `scrape_costco.py` are meant to work as standalone acquisition scripts.
- `validate_cross_retailer_flow.py` is a proof/check script, not a required production step.
## Test
```bash ```bash
./venv/bin/python -m unittest discover -s tests ./venv/bin/python -m unittest discover -s tests
``` ```
Useful one-off rebuilds: ## Project Docs
```bash - `pm/tasks.org`: task tracking
./venv/bin/python enrich_giant.py - `pm/data-model.org`: current data model notes
./venv/bin/python build_observed_products.py - `pm/review-workflow.org`: review and resolution workflow
./venv/bin/python build_review_queue.py
./venv/bin/python build_canonical_layer.py
```
## Project docs
- `pm/tasks.org`: task log and evidence
- `pm/data-model.org`: file layout and schema decisions
## Status
Completed through `t1.7`:
- Giant receipt fetch CLI
- data model and file layout
- Giant parser/enricher
- observed products
- review queue
- canonical layer scaffold
- conservative auto-link rules
Next planned task is `t1.8`: add a Costco raw ingest path.

View File

@@ -3,44 +3,11 @@ import os
import shutil import shutil
import sqlite3 import sqlite3
import tempfile import tempfile
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import browser_cookie3 import browser_cookie3
@dataclass
class StorageEntry:
origin: str
key: str
value: str
source: str
@dataclass
class BrowserContext:
cookies: object
storage_entries: list[StorageEntry]
def load_browser_context(
browser,
domain_name,
storage_origins=None,
profile_dir=None,
):
if browser != "firefox":
raise ValueError(f"unsupported browser: {browser}")
profile = Path(profile_dir) if profile_dir else find_firefox_profile_dir()
cookies = load_firefox_cookies(domain_name, profile)
storage_entries = read_firefox_storage_entries(
profile,
origin_filters=storage_origins or [],
)
return BrowserContext(cookies=cookies, storage_entries=storage_entries)
def find_firefox_profile_dir(): def find_firefox_profile_dir():
profiles_ini = firefox_profiles_root() / "profiles.ini" profiles_ini = firefox_profiles_root() / "profiles.ini"
parser = configparser.RawConfigParser() parser = configparser.RawConfigParser()
@@ -87,100 +54,68 @@ def load_firefox_cookies(domain_name, profile_dir):
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name) return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
def read_firefox_storage_entries(profile_dir, origin_filters): def read_firefox_local_storage(profile_dir, origin_filter):
profile_dir = Path(profile_dir)
entries = []
entries.extend(read_firefox_ls_entries(profile_dir, origin_filters))
entries.extend(read_firefox_webapps_entries(profile_dir, origin_filters))
deduped = []
seen = set()
for entry in entries:
key = (entry.origin, entry.key, entry.value, entry.source)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def read_firefox_ls_entries(profile_dir, origin_filters):
entries = []
storage_root = profile_dir / "storage" / "default" storage_root = profile_dir / "storage" / "default"
if not storage_root.exists(): if not storage_root.exists():
return entries return {}
for ls_path in storage_root.glob("*/ls/data.sqlite"): for ls_path in storage_root.glob("*/ls/data.sqlite"):
origin = decode_firefox_origin(ls_path.parents[1].name) origin = decode_firefox_origin(ls_path.parents[1].name)
if not origin_matches(origin, origin_filters): if origin_filter.lower() not in origin.lower():
continue continue
for row in query_sqlite(ls_path, "SELECT key, value FROM data"): return {
entries.append( stringify_sql_value(row[0]): stringify_sql_value(row[1])
StorageEntry( for row in query_sqlite(ls_path, "SELECT key, value FROM data")
origin=origin, }
key=stringify_sql_value(row[0]), return {}
value=stringify_sql_value(row[1]),
source=ls_path.as_posix(),
)
)
return entries
def read_firefox_webapps_entries(profile_dir, origin_filters): def read_firefox_webapps_store(profile_dir, origin_filter):
webapps_path = profile_dir / "webappsstore.sqlite" webapps_path = profile_dir / "webappsstore.sqlite"
if not webapps_path.exists(): if not webapps_path.exists():
return [] return {}
entries = [] values = {}
for row in query_sqlite( for row in query_sqlite(
webapps_path, webapps_path,
"SELECT originKey, key, value FROM webappsstore2", "SELECT originKey, key, value FROM webappsstore2",
): ):
origin = stringify_sql_value(row[0]) origin = stringify_sql_value(row[0])
if not origin_matches(origin, origin_filters): if origin_filter.lower() not in origin.lower():
continue continue
entries.append( values[stringify_sql_value(row[1])] = stringify_sql_value(row[2])
StorageEntry( return values
origin=origin,
key=stringify_sql_value(row[1]),
value=stringify_sql_value(row[2]),
source=webapps_path.as_posix(),
)
)
return entries
def query_sqlite(path, query): def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path) copied_path = copy_sqlite_to_temp(path)
connection = None
cursor = None
try: try:
with sqlite3.connect(copied_path) as connection: connection = sqlite3.connect(copied_path)
return list(connection.execute(query)) cursor = connection.cursor()
cursor.execute(query)
rows = cursor.fetchall()
return rows
except sqlite3.OperationalError: except sqlite3.OperationalError:
return [] return []
finally: finally:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
copied_path.unlink(missing_ok=True) copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path): def copy_sqlite_to_temp(path):
source_path = Path(path) fd, tmp = tempfile.mkstemp(suffix=".sqlite")
with tempfile.NamedTemporaryFile(delete=False, suffix=source_path.suffix) as handle: os.close(fd)
temp_path = Path(handle.name) shutil.copyfile(path, tmp)
shutil.copy2(source_path, temp_path) return Path(tmp)
return temp_path
def decode_firefox_origin(raw_origin): def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0] origin = raw_origin.split("^", 1)[0]
return origin.replace("+++", "://") return origin.replace("+++", "://")
def origin_matches(origin, origin_filters):
if not origin_filters:
return True
normalized_origin = origin.lower()
return any(filter_value.lower() in normalized_origin for filter_value in origin_filters)
def stringify_sql_value(value): def stringify_sql_value(value):
if value is None: if value is None:
return "" return ""

414
build_purchases.py Normal file
View File

@@ -0,0 +1,414 @@
from decimal import Decimal
from pathlib import Path
import click
import build_canonical_layer
import build_observed_products
import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, stable_id, write_csv_rows
PURCHASE_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_item_key",
"observed_product_id",
"canonical_product_id",
"review_status",
"resolution_action",
"raw_item_name",
"normalized_item_name",
"image_url",
"retailer_item_id",
"upc",
"qty",
"unit",
"pack_qty",
"size_value",
"size_unit",
"measure_type",
"line_total",
"unit_price",
"store_name",
"store_number",
"store_city",
"store_state",
"price_per_each",
"price_per_each_basis",
"price_per_count",
"price_per_count_basis",
"price_per_lb",
"price_per_lb_basis",
"price_per_oz",
"price_per_oz_basis",
"is_discount_line",
"is_coupon_line",
"is_fee",
"raw_order_path",
]
EXAMPLE_FIELDS = [
"example_name",
"canonical_product_id",
"giant_purchase_date",
"giant_raw_item_name",
"giant_price_per_lb",
"costco_purchase_date",
"costco_raw_item_name",
"costco_price_per_lb",
"notes",
]
CATALOG_FIELDS = [
"canonical_product_id",
"canonical_name",
"category",
"product_type",
"brand",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"notes",
"created_at",
"updated_at",
]
RESOLUTION_FIELDS = [
"observed_product_id",
"canonical_product_id",
"resolution_action",
"status",
"resolution_notes",
"reviewed_at",
]
def decimal_or_zero(value):
return to_decimal(value) or Decimal("0")
def derive_metrics(row):
line_total = to_decimal(row.get("line_total"))
qty = to_decimal(row.get("qty"))
pack_qty = to_decimal(row.get("pack_qty"))
size_value = to_decimal(row.get("size_value"))
picked_weight = to_decimal(row.get("picked_weight"))
size_unit = row.get("size_unit", "")
price_per_each = row.get("price_per_each", "")
price_per_lb = row.get("price_per_lb", "")
price_per_oz = row.get("price_per_oz", "")
price_per_count = ""
basis_each = ""
basis_count = ""
basis_lb = ""
basis_oz = ""
if price_per_each:
basis_each = "line_total_over_qty"
elif line_total is not None and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
basis_each = "line_total_over_qty"
if line_total is not None and pack_qty not in (None, 0):
total_count = pack_qty * (qty or Decimal("1"))
if total_count not in (None, 0):
price_per_count = format_decimal(line_total / total_count)
basis_count = "line_total_over_pack_qty"
if picked_weight not in (None, 0):
price_per_lb = format_decimal(line_total / picked_weight) if line_total is not None else ""
price_per_oz = (
format_decimal((line_total / picked_weight) / Decimal("16"))
if line_total is not None
else ""
)
basis_lb = "picked_weight_lb"
basis_oz = "picked_weight_lb_to_oz"
elif line_total is not None and size_value not in (None, 0):
total_units = size_value * (pack_qty or Decimal("1")) * (qty or Decimal("1"))
if size_unit == "lb" and total_units not in (None, 0):
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
basis_lb = "parsed_size_lb"
basis_oz = "parsed_size_lb_to_oz"
elif size_unit == "oz" and total_units not in (None, 0):
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * Decimal("16"))
basis_lb = "parsed_size_oz_to_lb"
basis_oz = "parsed_size_oz"
return {
"price_per_each": price_per_each,
"price_per_each_basis": basis_each,
"price_per_count": price_per_count,
"price_per_count_basis": basis_count,
"price_per_lb": price_per_lb,
"price_per_lb_basis": basis_lb,
"price_per_oz": price_per_oz,
"price_per_oz_basis": basis_oz,
}
def order_lookup(rows, retailer):
return {
(retailer, row["order_id"]): row
for row in rows
}
def read_optional_csv_rows(path):
path = Path(path)
if not path.exists():
return []
return read_csv_rows(path)
def load_resolution_lookup(resolution_rows):
lookup = {}
for row in resolution_rows:
if not row.get("observed_product_id"):
continue
lookup[row["observed_product_id"]] = row
return lookup
def merge_catalog_rows(existing_rows, auto_rows):
merged = {}
for row in auto_rows + existing_rows:
canonical_product_id = row.get("canonical_product_id", "")
if canonical_product_id:
merged[canonical_product_id] = row
return sorted(merged.values(), key=lambda row: row["canonical_product_id"])
def catalog_row_from_canonical(row):
return {
"canonical_product_id": row.get("canonical_product_id", ""),
"canonical_name": row.get("canonical_name", ""),
"category": row.get("category", ""),
"product_type": row.get("product_type", ""),
"brand": row.get("brand", ""),
"variant": row.get("variant", ""),
"size_value": row.get("size_value", ""),
"size_unit": row.get("size_unit", ""),
"pack_qty": row.get("pack_qty", ""),
"measure_type": row.get("measure_type", ""),
"notes": row.get("notes", ""),
"created_at": row.get("created_at", ""),
"updated_at": row.get("updated_at", ""),
}
def build_link_state(enriched_rows):
observed_rows = build_observed_products.build_observed_products(enriched_rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair(
canonical_rows,
link_rows,
giant_row,
costco_row,
)
observed_id_by_key = {
row["observed_key"]: row["observed_product_id"] for row in observed_rows
}
canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
}
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
def build_purchase_rows(
giant_enriched_rows,
costco_enriched_rows,
giant_orders,
costco_orders,
resolution_rows,
):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
(
observed_rows,
canonical_rows,
link_rows,
observed_id_by_key,
canonical_id_by_observed,
) = build_link_state(all_enriched_rows)
resolution_lookup = load_resolution_lookup(resolution_rows)
for observed_product_id, resolution in resolution_lookup.items():
action = resolution.get("resolution_action", "")
status = resolution.get("status", "")
if status != "approved":
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"]
elif action == "exclude":
canonical_id_by_observed[observed_product_id] = ""
orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco"))
purchase_rows = []
for row in sorted(
all_enriched_rows,
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
):
observed_key = build_observed_products.build_observed_key(row)
observed_product_id = observed_id_by_key.get(observed_key, "")
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row)
resolution = resolution_lookup.get(observed_product_id, {})
purchase_rows.append(
{
"purchase_date": row["order_date"],
"retailer": row["retailer"],
"order_id": row["order_id"],
"line_no": row["line_no"],
"observed_item_key": row["observed_item_key"],
"observed_product_id": observed_product_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
"review_status": resolution.get("status", ""),
"resolution_action": resolution.get("resolution_action", ""),
"raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"],
"image_url": row.get("image_url", ""),
"retailer_item_id": row["retailer_item_id"],
"upc": row["upc"],
"qty": row["qty"],
"unit": row["unit"],
"pack_qty": row["pack_qty"],
"size_value": row["size_value"],
"size_unit": row["size_unit"],
"measure_type": row["measure_type"],
"line_total": row["line_total"],
"unit_price": row["unit_price"],
"store_name": order_row.get("store_name", ""),
"store_number": order_row.get("store_number", ""),
"store_city": order_row.get("store_city", ""),
"store_state": order_row.get("store_state", ""),
"is_discount_line": row["is_discount_line"],
"is_coupon_line": row["is_coupon_line"],
"is_fee": row["is_fee"],
"raw_order_path": row["raw_order_path"],
**metrics,
}
)
return purchase_rows, observed_rows, canonical_rows, link_rows
def apply_manual_resolutions_to_links(link_rows, resolution_rows):
link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows}
for resolution in resolution_rows:
if resolution.get("status") != "approved":
continue
observed_product_id = resolution.get("observed_product_id", "")
action = resolution.get("resolution_action", "")
if not observed_product_id:
continue
if action == "exclude":
link_by_observed.pop(observed_product_id, None)
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
link_by_observed[observed_product_id] = {
"observed_product_id": observed_product_id,
"canonical_product_id": resolution["canonical_product_id"],
"link_method": f"manual_{action}",
"link_confidence": "high",
"review_status": resolution.get("status", ""),
"reviewed_by": "",
"reviewed_at": resolution.get("reviewed_at", ""),
"link_notes": resolution.get("resolution_notes", ""),
}
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
def build_comparison_examples(purchase_rows):
giant_banana = None
costco_banana = None
for row in purchase_rows:
if row.get("normalized_item_name") != "BANANA":
continue
if not row.get("canonical_product_id"):
continue
if row["retailer"] == "giant" and row.get("price_per_lb"):
giant_banana = row
if row["retailer"] == "costco" and row.get("price_per_lb"):
costco_banana = row
if not giant_banana or not costco_banana:
return []
return [
{
"example_name": "banana_price_per_lb",
"canonical_product_id": giant_banana["canonical_product_id"],
"giant_purchase_date": giant_banana["purchase_date"],
"giant_raw_item_name": giant_banana["raw_item_name"],
"giant_price_per_lb": giant_banana["price_per_lb"],
"costco_purchase_date": costco_banana["purchase_date"],
"costco_raw_item_name": costco_banana["raw_item_name"],
"costco_price_per_lb": costco_banana["price_per_lb"],
"notes": "Example comparison using normalized price_per_lb across Giant and Costco",
}
]
@click.command()
@click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True)
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--links-csv", default="combined_output/product_links.csv", show_default=True)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
def main(
giant_items_enriched_csv,
costco_items_enriched_csv,
giant_orders_csv,
costco_orders_csv,
resolutions_csv,
catalog_csv,
links_csv,
output_csv,
examples_csv,
):
resolution_rows = read_optional_csv_rows(resolutions_csv)
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows(
read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv),
resolution_rows,
)
existing_catalog_rows = read_optional_csv_rows(catalog_csv)
merged_catalog_rows = merge_catalog_rows(
existing_catalog_rows,
[catalog_row_from_canonical(row) for row in canonical_rows],
)
link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows)
example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS)
write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, "
f"and {len(example_rows)} comparison examples to {examples_csv}"
)
if __name__ == "__main__":
main()

View File

@@ -213,9 +213,11 @@ def parse_costco_item(order_id, order_date, raw_path, line_no, item):
def iter_costco_rows(raw_dir): def iter_costco_rows(raw_dir):
for path in discover_json_files(raw_dir): for path in discover_json_files(raw_dir):
if path.name == "summary.json": if path.name in {"summary.json", "summary_requests.json"}:
continue continue
payload = json.loads(path.read_text(encoding="utf-8")) payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
continue
receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", []) receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
for receipt in receipts: for receipt in receipts:
order_id = receipt["transactionBarcode"] order_id = receipt["transactionBarcode"]

73
pm/review-workflow.org Normal file
View File

@@ -0,0 +1,73 @@
* review and item-resolution workflow
This document defines the durable review workflow for unresolved observed
products.
** persistent files
- `combined_output/purchases.csv`
Flat normalized purchase log. This is the review input because it retains:
- raw item name
- normalized item name
- observed product id
- canonical product id when resolved
- retailer/order/date/price context
- `combined_output/review_queue.csv`
Current unresolved observed products grouped for review.
- `combined_output/review_resolutions.csv`
Durable mapping decisions from observed products to canonical products.
- `combined_output/canonical_catalog.csv`
Durable canonical item catalog used by manual review and later purchase-log
rebuilds.
There is no separate alias file in v1. `review_resolutions.csv` is the mapping
layer from observed products to canonical product ids.
** workflow
1. Run `build_purchases.py`
This refreshes the purchase log and seeds/updates the canonical catalog from
current auto-linked canonical rows.
2. Run `review_products.py`
This rebuilds `review_queue.csv` from unresolved purchase rows and prompts in
the terminal for one observed product at a time.
3. Choose one of:
- link to existing canonical
- create new canonical
- exclude
- skip
4. `review_products.py` writes decisions immediately to:
- `review_resolutions.csv`
- `canonical_catalog.csv` when a new canonical item is created
5. Rerun `build_purchases.py`
This reapplies approved resolutions so the final normalized purchase log now
carries the reviewed `canonical_product_id`.
** what the human edits
The primary interface is terminal prompts in `review_products.py`.
The human provides:
- existing canonical id when linking
- canonical name/category/product type when creating a new canonical item
- optional resolution notes
The generated CSVs remain editable by hand if needed, but the intended workflow
is terminal-first.
** durability
- Resolutions are keyed by `observed_product_id`, not by one-off text
substitution.
- Canonical products are keyed by stable `canonical_product_id`.
- Future runs reuse approved mappings through `review_resolutions.csv`.
** retention of audit fields
The final `purchases.csv` retains:
- `raw_item_name`
- `normalized_item_name`
- `canonical_product_id`
This preserves the raw receipt description, the deterministic parser output, and
the human-approved canonical identity in one flat purchase log.

View File

@@ -27,6 +27,8 @@ carry forward image url
3. build observed-product atble from enriched items 3. build observed-product atble from enriched items
* git issues * git issues
** ssh / access to gitea
ssh://git@192.168.1.207:2020/ben/scrape-giant.git ssh://git@192.168.1.207:2020/ben/scrape-giant.git
https://git.hgsky.me/ben/scrape-giant.git https://git.hgsky.me/ben/scrape-giant.git
@@ -44,6 +46,31 @@ git remote set-url gitea git@gitea:ben/scrape-giant.git
on local network: use ssh to 192.168.1.207:2020 on local network: use ssh to 192.168.1.207:2020
from elsewhere/public: use https to git.hgsky.me/... unless you later expose ssh properly from elsewhere/public: use https to git.hgsky.me/... unless you later expose ssh properly
** stash
z z to stash local work only
take care not to add ignored files which will add the venv and `__pycache__`
z p to pop the stash back
** creating remote branches
P p, magit will suggest upstream (gitea), select and Enter and it will be created
** cherry-picking
b b : switch to desired branch (review)
l B : open reflog for local branches
(my changes were committed to local cx but not pushed to gitea/cx)
put point on the commit you want; did this in sequence
A A : cherry pick commit to current branch
minibuffer will show the commit and all branches, leave it on that commit
the final commit was not shown by hash, just the branch cx
since (local) cx was caught up with that branch
** reverting a branch
b l : switch to local branch (cx)
l l : open local reflog
put point on the commit; highlighted remote gitea/cx
X : reset branch; prompts you, selected cx
* giant requests * giant requests
** item: ** item:
get: get:
@@ -125,6 +152,14 @@ request-context: appId=cid-v1:75750625-0c81-4f08-9f5d-ce4f73198e54
X-Firefox-Spdy: h2 X-Firefox-Spdy: h2
* costco requests * costco requests
- localstorage idToken has the auth token, but needs "Bearer " prepended
- localstorage clientID has the COSTCO_X_WCS_CLIENTID
- I don't see the client_identifier uuid anywhere.
we will pull from .env first (may have to hardcode)
then overwrite with session data (token)
hopefully this doesnt change.
** warehouse ** warehouse
*** POST *** POST
https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql
@@ -204,3 +239,29 @@ request:
- pull all orders by default - pull all orders by default
- add online orders - add online orders
- copy header data from browser using selenium - copy header data from browser using selenium
* how to run
python scrape_giant.py
python enrich_giant.py
python scrape_costco.py
python enrich_costco.py
python build_observed_products.py
python build_review_queue.py
python build_canonical_layer.py
python validate_cross_retailer_flow.py
* t1.11 tasks [2026-03-17 Tue 13:49]
ok i ran a few. time to run some cleanups here - i'm wondering if we shouldn't be less aggressive with canonical names and encourage a better manual process to start.
1. auto-created canonical_names lack category, product_type - ok with filling these in manually in the catalog once the queue is empty
2. canonical_names feel too specific, e.g., "5DZ egg"
3. some canonical_names need consolidation, eg "LIME" and "LIME . / ." ; poss cleanup issue. there are 5 entries for ergg but but they are all regular large grade A white eggs, just different amounts in dozens.
Eggs are actually a great candidate for the kind of analysis we want to do - the pipeline should have caught and properly sorted these into size/qty:
```canonical_product_id canonical_name category product_type brand variant size_value size_unit pack_qty measure_type notes created_at updated_at
gcan_0e350505fd22 5DZ EGG / / KS each auto-linked via exact_name
gcan_47279a80f5f3 EGG 5 DOZ. BBS each auto-linked via exact_name
gcan_7d099130c1bf LRG WHITE EGG SB 30 count auto-linked via exact_upc
gcan_849c2817e667 GDA LRG WHITE EGG SB 18 count auto-linked via exact_upc
gcan_cb0c6c8cf480 LG EGG CONVENTIONAL 18 count count auto-linked via exact_name_size ```
4. Build costco mechanism for matching discount to line item.
1. Discounts appear as their own line items with a number like /123456, this matches the UPC of the discounted item
2. must be date-matched to the UPC

View File

@@ -276,9 +276,49 @@
- commit: `7789c2e` on branch `cx` - commit: `7789c2e` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_giant.py --help`; `./venv/bin/python scrape_costco.py --help`; verified Firefox storage token extraction and locked-db copy behavior in unit tests - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_giant.py --help`; `./venv/bin/python scrape_costco.py --help`; verified Firefox storage token extraction and locked-db copy behavior in unit tests
- date: 2026-03-16 - date: 2026-03-16
* [ ] t1.9: compute normalized comparison metrics (2-4 commits) * [X] t1.8.7: simplify costco session bootstrap and remove over-abstraction (2-4 commits)
** acceptance criteria ** acceptance criteria
- make `scrape_costco.py` readable end-to-end without tracing through multiple partial bootstrap layers
- keep `browser_session.py` limited to low-level browser data access only:
- firefox profile discovery
- cookie loading
- storage reads
- sqlite copy/read helpers
- remove or sharply reduce `retailer_sessions.py` so retailer-specific header extraction lives with the retailer scraper or in a very small retailer-specific helper
- make session bootstrap flow explicit and linear:
- load browser context
- extract costco auth values
- build request headers
- build requests session
- eliminate inconsistent/obsolete function signatures and dead call paths (e.g. mixed `build_session(...)` calling conventions, stale fallback branches, mismatched `build_headers(...)` args)
- add one focused bootstrap debug print showing whether cookies, authorization, client id, and client identifier were found
- preserve current working behavior where available; this is a refactor/clarification task, not a feature expansion task
** notes
- goal is to restore concern separation and debuggability
- prefer obvious retailer-specific code over “generic” helpers that guess and obscure control flow
- browser access can stay shared; retailer auth mapping should be explicit
- no new heuristics in this task
** evidence
- commit: `d7a0329` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified explicit Costco session bootstrap flow in `scrape_costco.py` and low-level-only browser access in `browser_session.py`
- date: 2026-03-16
* [X] t1.9: build pivot-ready normalized purchase log and comparison metrics (2-4 commits)
** acceptance criteria
- produce a flat `purchases.csv` suitable for excel pivot tables and pivot charts
- each purchase row preserves:
- purchase date
- retailer
- order id
- raw item name
- normalized item name
- canonical item id when resolved
- quantity / unit
- line total
- store/location info where available
- derive normalized comparison fields where possible on enriched or observed product rows: - derive normalized comparison fields where possible on enriched or observed product rows:
- `price_per_lb` - `price_per_lb`
- `price_per_oz` - `price_per_oz`
@@ -289,17 +329,92 @@
- receipt weight - receipt weight
- explicit count/pack - explicit count/pack
- emit nulls when basis is unknown, conflicting, or ambiguous - emit nulls when basis is unknown, conflicting, or ambiguous
- support pivot-friendly analysis of purchase frequency and item cost over time
- document at least one Giant vs Costco comparison example using the normalized metrics - document at least one Giant vs Costco comparison example using the normalized metrics
** notes ** notes
- compute metrics as close to the raw observation as possible - compute metrics as close to the raw observation as possible
- canonical layer can aggregate later, but should not invent missing unit economics - canonical layer can aggregate later, but should not invent missing unit economics
- unit discipline matters more than coverage - unit discipline matters more than coverage
- raw item name must be retained for audit/debugging
** evidence ** evidence
- commit: - commit: `be1bf63` on branch `cx`
- tests: - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; verified `combined_output/purchases.csv` and `combined_output/comparison_examples.csv` on the current Giant + Costco dataset
- date: - date: 2026-03-16
* [X] t1.11: define review and item-resolution workflow for unresolved products (2-3 commits)
** acceptance criteria
- define the persistent files used to resolve unknown items, including:
- review queue
- canonical item catalog
- alias / mapping layer if separate
- specify how unresolved items move from `review_queue.csv` into the final normalized purchase log
- define the manual resolution workflow, including:
- what the human edits
- what script is rerun afterward
- how resolved mappings are persisted for future runs
- ensure resolved items are positively identified into stable canonical item ids rather than one-off text substitutions
- document how raw item name, normalized item name, and canonical item id are all retained
** notes
- goal is “approve once, reuse forever”
- keep the workflow simple and auditable
- manual review is fine; the important part is making it durable and rerunnable
** evidence
- commit: `c7dad54` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; verified `combined_output/review_queue.csv`, `combined_output/review_resolutions.csv` workflow, and `combined_output/canonical_catalog.csv`
- date: 2026-03-16
* [X] t1.12: simplify review process display
Clearly show current state separate from proposed future state.
** acceptance criteria
1. Display position in review queue, e.g., (1/22)
2. Display compact header with observed_product under review, queue position, and canonical decision, e.g.: "Resolve [n] observed product group [name] and associated items to canonical_name [name]? (\n [n] matched items)"
3. color-code outputs based on info, input/prompt, warning/error
1. color action menu/requests for input differently from display text; do not color individual options separately
2. "no canonical_name suggestions found" is informational, not a warning/error.
4. update action menu `[x]exclude` to `e[x]clude`
5. on each review item, display a list of all matched items to be linked, sorted by descending date:
1. YYYY-mm-dd, price, raw item name, normalized item name, upc, retailer
2. image URL, if exists
3. Sample:
6. on each review item, suggest (but do not auto-apply) up to 3 likely existing canonicals using determinstic rules, e.g:
1. exact normalized name match
2. prefix/contains match on canonical name
3. exact UPC
7. Sample Entry:
#+begin_comment
Review 7/22: Resolve observed_product MIXED PEPPER to canonical_name [__]?
2 matched items:
[1] 2026-03-12 | 7.49 | MIXED PEPPER 6-PACK | MIXED PEPPER | [upc] | costco | [img_url]
[2] [YYYY-mm-dd] | [price] | [raw_name] | [observed_name] | [upc] | [retailer] | [img_url]
2 canonical suggestions found:
[1] BELL PEPPERS, PRODUCE
[2] PEPPER, SPICES
#+end_comment
8. When link is selected, users should be able to select the number of the item in the list, e.g.:
#+begin_comment
Select the canonical_name to associate [n] items with:
[1] GRB GRADU PCH PUF1. | gcan_01b0d623aa02
[2] BTB CHICKEN | gcan_0201f0feb749
[3] LIME | gcan_02074d9e7359
#+end_comment
9. Add confirmation to link selection with instructions, "[n] [observed_name] and future observed_name matches will be associated with [canonical_name], is this ok?
actions: [Y]es [n]o [b]ack [s]kip [q]uit
- reinforce project terminology such as raw_name, observed_name, canonical_name
** evidence
- commit: `7b8141c`, `d39497c`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python -m unittest tests.test_review_workflow tests.test_purchases`; `./venv/bin/python review_products.py --help`; verified compact review header, numbered matched-item display, informational no-suggestion state, numbered canonical selection, and confirmation flow
- date: 2026-03-17
** notes
- The key improvement was shifting the prompt from system metadata to reviewer intent: one observed_product, its matched retailer rows, and one canonical_name decision.
- Numbered canonical selection plus confirmation worked better than free-text id entry and should reduce accidental links.
- Deterministic suggestions remain intentionally conservative; they speed up common cases, but unresolved items still depend on human review by design.
* [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved products (2-4 commits) * [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved products (2-4 commits)

View File

@@ -1,136 +0,0 @@
import json
import re
from dataclasses import dataclass
from browser_session import load_browser_context
UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{12}$"
)
JWT_RE = re.compile(r"^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$")
@dataclass
class RetailerSession:
cookies: object
headers: dict[str, str]
def load_giant_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name="giantfood.com",
storage_origins=["giantfood.com"],
profile_dir=profile_dir,
)
return RetailerSession(cookies=context.cookies, headers={})
def load_costco_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name=".costco.com",
storage_origins=["costco.com"],
profile_dir=profile_dir,
)
return RetailerSession(
cookies=context.cookies,
headers=extract_costco_headers(context.storage_entries),
)
def extract_costco_headers(storage_entries):
authorization = ""
client_id = ""
client_identifier = ""
for key_path, value in iter_storage_candidates(storage_entries):
normalized_key = normalize_key(key_path)
normalized_value = str(value).strip()
if not normalized_value:
continue
if not authorization and looks_like_authorization(normalized_key, normalized_value):
authorization = normalize_authorization(normalized_value)
continue
if not client_identifier and looks_like_client_identifier(
normalized_key, normalized_value
):
client_identifier = normalized_value
continue
if not client_id and looks_like_client_id(normalized_key, normalized_value):
client_id = normalized_value
headers = {}
if authorization:
headers["costco-x-authorization"] = authorization
if client_id:
headers["costco-x-wcs-clientId"] = client_id
if client_identifier:
headers["client-identifier"] = client_identifier
return headers
def iter_storage_candidates(storage_entries):
for entry in storage_entries:
yield entry.key, entry.value
yield from walk_candidate_value(entry.key, parse_json_value(entry.value))
def walk_candidate_value(prefix, value):
if isinstance(value, dict):
for key, nested in value.items():
nested_prefix = f"{prefix}.{key}"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
elif isinstance(value, list):
for index, nested in enumerate(value):
nested_prefix = f"{prefix}[{index}]"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
def parse_json_value(value):
if not isinstance(value, str):
return value
text = value.strip()
if not text or text[0] not in "{[":
return value
try:
return json.loads(text)
except json.JSONDecodeError:
return value
def normalize_key(value):
return re.sub(r"[^a-z0-9]+", "", value.lower())
def looks_like_authorization(key, value):
return (
("authorization" in key or "token" in key)
and bool(normalize_authorization(value))
)
def normalize_authorization(value):
candidate = str(value).strip()
if candidate.lower().startswith("bearer "):
token = candidate.split(None, 1)[1].strip()
return f"Bearer {token}" if JWT_RE.match(token) else ""
if JWT_RE.match(candidate):
return f"Bearer {candidate}"
return ""
def looks_like_client_id(key, value):
return "clientid" in key and "identifier" not in key and bool(UUID_RE.match(value))
def looks_like_client_identifier(key, value):
return "clientidentifier" in key and bool(UUID_RE.match(value))

426
review_products.py Normal file
View File

@@ -0,0 +1,426 @@
from collections import defaultdict
from datetime import date
import click
import build_purchases
from layer_helpers import compact_join, stable_id, write_csv_rows
QUEUE_FIELDS = [
"review_id",
"retailer",
"observed_product_id",
"canonical_product_id",
"reason_code",
"priority",
"raw_item_names",
"normalized_names",
"upc_values",
"example_prices",
"seen_count",
"status",
"resolution_action",
"resolution_notes",
"created_at",
"updated_at",
]
def build_review_queue(purchase_rows, resolution_rows):
by_observed = defaultdict(list)
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "")
if not observed_product_id:
continue
by_observed[observed_product_id].append(row)
today_text = str(date.today())
queue_rows = []
for observed_product_id, rows in sorted(by_observed.items()):
current_resolution = resolution_lookup.get(observed_product_id, {})
if current_resolution.get("status") == "approved":
continue
unresolved_rows = [row for row in rows if not row.get("canonical_product_id")]
if not unresolved_rows:
continue
retailers = sorted({row["retailer"] for row in rows})
review_id = stable_id("rvw", observed_product_id)
queue_rows.append(
{
"review_id": review_id,
"retailer": " | ".join(retailers),
"observed_product_id": observed_product_id,
"canonical_product_id": current_resolution.get("canonical_product_id", ""),
"reason_code": "missing_canonical_link",
"priority": "high",
"raw_item_names": compact_join(
sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}),
limit=8,
),
"normalized_names": compact_join(
sorted(
{
row["normalized_item_name"]
for row in rows
if row["normalized_item_name"]
}
),
limit=8,
),
"upc_values": compact_join(
sorted({row["upc"] for row in rows if row["upc"]}),
limit=8,
),
"example_prices": compact_join(
sorted({row["line_total"] for row in rows if row["line_total"]}),
limit=8,
),
"seen_count": str(len(rows)),
"status": current_resolution.get("status", "pending"),
"resolution_action": current_resolution.get("resolution_action", ""),
"resolution_notes": current_resolution.get("resolution_notes", ""),
"created_at": current_resolution.get("reviewed_at", today_text),
"updated_at": today_text,
}
)
return queue_rows
def save_resolution_rows(path, rows):
write_csv_rows(path, rows, build_purchases.RESOLUTION_FIELDS)
def save_catalog_rows(path, rows):
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
def sort_related_items(rows):
return sorted(
rows,
key=lambda row: (
row.get("purchase_date", ""),
row.get("order_id", ""),
int(row.get("line_no", "0") or "0"),
),
reverse=True,
)
def build_canonical_suggestions(related_rows, catalog_rows, limit=3):
normalized_names = {
row.get("normalized_item_name", "").strip().upper()
for row in related_rows
if row.get("normalized_item_name", "").strip()
}
upcs = {
row.get("upc", "").strip()
for row in related_rows
if row.get("upc", "").strip()
}
suggestions = []
seen_ids = set()
def add_matches(rows, reason):
for row in rows:
canonical_product_id = row.get("canonical_product_id", "")
if not canonical_product_id or canonical_product_id in seen_ids:
continue
seen_ids.add(canonical_product_id)
suggestions.append(
{
"canonical_product_id": canonical_product_id,
"canonical_name": row.get("canonical_name", ""),
"reason": reason,
}
)
if len(suggestions) >= limit:
return True
return False
exact_upc_rows = [
row
for row in catalog_rows
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs
]
if add_matches(exact_upc_rows, "exact upc"):
return suggestions
exact_name_rows = [
row
for row in catalog_rows
if row.get("canonical_name", "").strip().upper() in normalized_names
]
if add_matches(exact_name_rows, "exact normalized name"):
return suggestions
contains_rows = []
for row in catalog_rows:
canonical_name = row.get("canonical_name", "").strip().upper()
if not canonical_name:
continue
for normalized_name in normalized_names:
if normalized_name in canonical_name or canonical_name in normalized_name:
contains_rows.append(row)
break
add_matches(contains_rows, "canonical name contains match")
return suggestions
def build_display_lines(queue_row, related_rows):
lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1):
lines.append(
" [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | "
"{upc} | {retailer}".format(
index=index,
purchase_date=row.get("purchase_date", ""),
line_total=row.get("line_total", ""),
raw_item_name=row.get("raw_item_name", ""),
normalized_item_name=row.get("normalized_item_name", ""),
upc=row.get("upc", ""),
retailer=row.get("retailer", ""),
)
)
if row.get("image_url"):
lines.append(f" {row['image_url']}")
if not lines:
lines.append(" [1] no matched item rows found")
return lines
def observed_name(queue_row, related_rows):
if queue_row.get("normalized_names"):
return queue_row["normalized_names"].split(" | ")[0]
for row in related_rows:
if row.get("normalized_item_name"):
return row["normalized_item_name"]
return queue_row.get("observed_product_id", "")
def choose_existing_canonical(display_rows, observed_label, matched_count):
click.secho(
f"Select the canonical_name to associate {matched_count} items with:",
fg=INFO_COLOR,
)
for index, row in enumerate(display_rows, start=1):
click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}")
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)),
)
chosen_row = display_rows[choice - 1]
click.echo(
f'{matched_count} "{observed_label}" items and future matches will be associated '
f'with "{chosen_row["canonical_name"]}".'
)
click.secho(
"actions: [y]es [n]o [b]ack [s]kip [q]uit",
fg=PROMPT_COLOR,
)
confirm = click.prompt(
click.style("confirm", fg=PROMPT_COLOR),
type=click.Choice(["y", "n", "b", "s", "q"]),
)
if confirm == "y":
return chosen_row["canonical_product_id"], ""
if confirm == "s":
return "", "skip"
if confirm == "q":
return "", "quit"
return "", "back"
def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total):
suggestions = build_canonical_suggestions(related_rows, catalog_rows)
observed_label = observed_name(queue_row, related_rows)
matched_count = len(related_rows)
click.echo("")
click.secho(
f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} "
"to canonical_name [__]?",
fg=INFO_COLOR,
)
click.echo(f"{matched_count} matched items:")
for line in build_display_lines(queue_row, related_rows):
click.echo(line)
if suggestions:
click.echo(f"{len(suggestions)} canonical suggestions found:")
for index, suggestion in enumerate(suggestions, start=1):
click.echo(f" [{index}] {suggestion['canonical_name']}")
else:
click.echo("no canonical_name suggestions found")
click.secho(
"[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:",
fg=PROMPT_COLOR,
)
action = click.prompt(
"",
type=click.Choice(["l", "n", "x", "s", "q"]),
prompt_suffix=" ",
)
if action == "q":
return None, None
if action == "s":
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if action == "x":
notes = click.prompt(
click.style("exclude notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "exclude",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "l":
display_rows = suggestions or [
{
"canonical_product_id": row["canonical_product_id"],
"canonical_name": row["canonical_name"],
"reason": "catalog sample",
}
for row in catalog_rows[:10]
]
while True:
canonical_product_id, outcome = choose_existing_canonical(
display_rows,
observed_label,
matched_count,
)
if outcome == "skip":
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if outcome == "quit":
return None, None
if outcome == "back":
continue
break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str)
category = click.prompt(
click.style("category", fg=PROMPT_COLOR),
default="",
show_default=False,
)
product_type = click.prompt(
click.style("product type", fg=PROMPT_COLOR),
default="",
show_default=False,
)
notes = click.prompt(
click.style("notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}")
canonical_row = {
"canonical_product_id": canonical_product_id,
"canonical_name": canonical_name,
"category": category,
"product_type": product_type,
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": notes,
"created_at": str(date.today()),
"updated_at": str(date.today()),
}
resolution_row = {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"resolution_action": "create",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}
return resolution_row, canonical_row
@click.command()
@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--queue-csv", default="combined_output/review_queue.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--limit", default=0, show_default=True, type=int)
@click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.")
def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only):
purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv)
resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv)
catalog_rows = build_purchases.read_optional_csv_rows(catalog_csv)
queue_rows = build_review_queue(purchase_rows, resolution_rows)
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}")
if refresh_only:
return
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")}
rows_by_observed = defaultdict(list)
for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "")
if observed_product_id:
rows_by_observed[observed_product_id].append(row)
reviewed = 0
for index, queue_row in enumerate(queue_rows, start=1):
if limit and reviewed >= limit:
break
related_rows = rows_by_observed.get(queue_row["observed_product_id"], [])
result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows))
if result == (None, None):
break
resolution_row, canonical_row = result
resolution_lookup[resolution_row["observed_product_id"]] = resolution_row
if canonical_row and canonical_row["canonical_product_id"] not in catalog_by_id:
catalog_by_id[canonical_row["canonical_product_id"]] = canonical_row
catalog_rows.append(canonical_row)
reviewed += 1
save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["observed_product_id"]))
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["canonical_product_id"]))
click.echo(
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} "
f"and {len(catalog_by_id)} catalog rows to {catalog_csv}"
)
if __name__ == "__main__":
main()

View File

@@ -1,15 +1,21 @@
import os
import csv import csv
import json import json
import time import time
import re import re
from pathlib import Path
from calendar import monthrange from calendar import monthrange
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from dotenv import load_dotenv
import click import click
from curl_cffi import requests from curl_cffi import requests
from retailer_sessions import load_costco_session from browser_session import (
find_firefox_profile_dir,
load_firefox_cookies,
read_firefox_local_storage,
read_firefox_webapps_store,
)
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql" BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco" RETAILER = "costco"
@@ -209,6 +215,19 @@ ITEM_FIELDS = [
"is_coupon_line", "is_coupon_line",
] ]
COSTCO_STORAGE_ORIGIN = "costco.com"
COSTCO_ID_TOKEN_STORAGE_KEY = "idToken"
COSTCO_CLIENT_ID_STORAGE_KEY = "clientID"
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_X_WCS_CLIENTID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(auth_headers): def build_headers(auth_headers):
headers = { headers = {
"accept": "*/*", "accept": "*/*",
@@ -225,11 +244,50 @@ def build_headers(auth_headers):
headers.update(auth_headers) headers.update(auth_headers)
return headers return headers
def build_session():
retailer_session = load_costco_session() def load_costco_browser_headers(profile_dir, authorization, client_id, client_identifier):
local_storage = read_firefox_local_storage(profile_dir, COSTCO_STORAGE_ORIGIN)
webapps_store = read_firefox_webapps_store(profile_dir, COSTCO_STORAGE_ORIGIN)
auth_header = authorization.strip() if authorization else ""
if client_id:
client_id = client_id.strip()
if client_identifier:
client_identifier = client_identifier.strip()
if not auth_header:
id_token = (
local_storage.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
)
if id_token:
auth_header = f"Bearer {id_token}"
client_id = client_id or (
local_storage.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
)
if not auth_header:
raise click.ClickException(
"could not find Costco auth token; set COSTCO_X_AUTHORIZATION or load Firefox idToken"
)
if not client_id or not client_identifier:
raise click.ClickException(
"missing Costco client ids; set COSTCO_X_WCS_CLIENTID and COSTCO_CLIENT_IDENTIFIER"
)
return {
"costco-x-authorization": auth_header,
"costco-x-wcs-clientId": client_id,
"client-identifier": client_identifier,
}
def build_session(profile_dir, auth_headers):
session = requests.Session() session = requests.Session()
session.cookies.update(retailer_session.cookies) session.cookies.update(load_firefox_cookies(".costco.com", profile_dir))
session.headers.update(build_headers(retailer_session.headers)) session.headers.update(build_headers(auth_headers))
session.headers.update(auth_headers)
return session return session
@@ -247,7 +305,7 @@ def graphql_post(session, query, variables):
last_response = response last_response = response
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
click.echo(f"retry {attempt + 1}/3 status={response.status_code}") click.echo(f"retry {attempt + 1}/3 status={response.status_code} body={response.text[:500]}")
except Exception as exc: # pragma: no cover - network error path except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}") click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3) time.sleep(3)
@@ -578,15 +636,48 @@ def write_csv(path, rows, fieldnames):
type=int, type=int,
help="How many months of receipts to enumerate back from today.", help="How many months of receipts to enumerate back from today.",
) )
def main(outdir, document_type, document_sub_type, window_days, months_back): @click.option(
"--firefox-profile-dir",
default=None,
help="Firefox profile directory to use for cookies and session storage.",
)
def main(
outdir,
document_type,
document_sub_type,
window_days,
months_back,
firefox_profile_dir,
):
outdir = Path(outdir) outdir = Path(outdir)
raw_dir = outdir / "raw" raw_dir = outdir / "raw"
try: config = load_config()
session = build_session()
except Exception as exc: profile_dir = Path(firefox_profile_dir) if firefox_profile_dir else None
raise click.ClickException( if profile_dir is None:
f"failed to load Costco browser session: {exc}" try:
) from exc profile_dir = find_firefox_profile_dir()
except Exception:
profile_dir = click.prompt(
"Firefox profile dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
)
auth_headers = load_costco_browser_headers(
profile_dir,
authorization=config["authorization"],
client_id=config["client_id"],
client_identifier=config["client_identifier"],
)
session = build_session(profile_dir, auth_headers)
click.echo(
"session bootstrap: "
f"cookies={True} "
f"authorization={bool(auth_headers.get('costco-x-authorization'))} "
f"client_id={bool(auth_headers.get('costco-x-wcs-clientId'))} "
f"client_identifier={bool(auth_headers.get('client-identifier'))}"
)
start_date, end_date = resolve_date_range(months_back) start_date, end_date = resolve_date_range(months_back)
summary_payload, request_metadata = fetch_summary_windows( summary_payload, request_metadata = fetch_summary_windows(
@@ -623,3 +714,4 @@ def main(outdir, document_type, document_sub_type, window_days, months_back):
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -8,7 +8,7 @@ import click
from dotenv import load_dotenv from dotenv import load_dotenv
from curl_cffi import requests from curl_cffi import requests
from retailer_sessions import load_giant_session from browser_session import find_firefox_profile_dir, load_firefox_cookies
BASE = "https://giantfood.com" BASE = "https://giantfood.com"
@@ -67,9 +67,9 @@ def load_config():
def build_session(): def build_session():
browser_session = load_giant_session() profile_dir = find_firefox_profile_dir()
session = requests.Session() session = requests.Session()
session.cookies.update(browser_session.cookies) session.cookies.update(load_firefox_cookies("giantfood.com", profile_dir))
session.headers.update( session.headers.update(
{ {
"user-agent": ( "user-agent": (

View File

@@ -2,13 +2,14 @@ import sqlite3
import tempfile import tempfile
import unittest import unittest
from pathlib import Path from pathlib import Path
from unittest import mock
import browser_session import browser_session
import retailer_sessions import scrape_costco
class BrowserSessionTests(unittest.TestCase): class BrowserSessionTests(unittest.TestCase):
def test_read_firefox_ls_entries_reads_storage_from_copied_sqlite(self): def test_read_firefox_local_storage_reads_copied_sqlite(self):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir) / "abcd.default-release" profile_dir = Path(tmpdir) / "abcd.default-release"
ls_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls" ls_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
@@ -19,33 +20,43 @@ class BrowserSessionTests(unittest.TestCase):
connection.execute("CREATE TABLE data (key TEXT, value TEXT)") connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute( connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)", "INSERT INTO data (key, value) VALUES (?, ?)",
("session", '{"costco":{"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}}'), ("costco-x-wcs-clientId", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
) )
entries = browser_session.read_firefox_storage_entries( values = browser_session.read_firefox_local_storage(
profile_dir, profile_dir,
origin_filters=["costco.com"], origin_filter="costco.com",
) )
self.assertEqual(1, len(entries)) self.assertEqual(
self.assertEqual("https://www.costco.com", entries[0].origin) "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
self.assertEqual("session", entries[0].key) values["costco-x-wcs-clientId"],
def test_extract_costco_headers_from_storage_json(self):
entries = [
browser_session.StorageEntry(
origin="https://www.costco.com",
key="authState",
value=(
'{"authorization":"Bearer header.payload.signature",'
'"wcsClientId":"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",'
'"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}'
),
source="memory",
) )
]
headers = retailer_sessions.extract_costco_headers(entries) def test_load_costco_browser_headers_reads_id_token_and_client_id(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir)
storage_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
storage_dir.mkdir(parents=True)
db_path = storage_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("idToken", "header.payload.signature"),
)
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("clientID", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
)
headers = scrape_costco.load_costco_browser_headers(
profile_dir,
authorization="",
client_id="",
client_identifier="481b1aec-aa3b-454b-b81b-48187e28f205",
)
self.assertEqual("Bearer header.payload.signature", headers["costco-x-authorization"]) self.assertEqual("Bearer header.payload.signature", headers["costco-x-authorization"])
self.assertEqual( self.assertEqual(
@@ -57,6 +68,88 @@ class BrowserSessionTests(unittest.TestCase):
headers["client-identifier"], headers["client-identifier"],
) )
def test_load_costco_browser_headers_prefers_env_values(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir)
storage_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
storage_dir.mkdir(parents=True)
db_path = storage_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("idToken", "storage.payload.signature"),
)
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("clientID", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
)
headers = scrape_costco.load_costco_browser_headers(
profile_dir,
authorization="Bearer env.payload.signature",
client_id="env-client-id",
client_identifier="481b1aec-aa3b-454b-b81b-48187e28f205",
)
self.assertEqual("Bearer env.payload.signature", headers["costco-x-authorization"])
self.assertEqual("env-client-id", headers["costco-x-wcs-clientId"])
def test_scrape_costco_prompts_for_profile_dir_when_autodiscovery_fails(self):
with mock.patch.object(
scrape_costco,
"find_firefox_profile_dir",
side_effect=FileNotFoundError("no default profile"),
), mock.patch.object(
scrape_costco.click,
"prompt",
return_value=Path("/tmp/profile"),
) as mocked_prompt, mock.patch.object(
scrape_costco,
"load_config",
return_value={
"authorization": "",
"client_id": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client_identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"load_costco_browser_headers",
return_value={
"costco-x-authorization": "Bearer header.payload.signature",
"costco-x-wcs-clientId": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client-identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"build_session",
return_value=object(),
), mock.patch.object(
scrape_costco,
"fetch_summary_windows",
return_value=(
{"data": {"receiptsWithCounts": {"receipts": []}}},
[],
),
), mock.patch.object(
scrape_costco,
"write_json",
), mock.patch.object(
scrape_costco,
"write_csv",
):
scrape_costco.main.callback(
outdir="/tmp/costco_output",
document_type="all",
document_sub_type="all",
window_days=92,
months_back=3,
firefox_profile_dir=None,
)
mocked_prompt.assert_called_once()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -411,6 +411,26 @@ class CostcoPipelineTests(unittest.TestCase):
] ]
with mock.patch.object( with mock.patch.object(
scrape_costco,
"load_config",
return_value={
"authorization": "",
"client_id": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client_identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"find_firefox_profile_dir",
return_value=Path("/tmp/profile"),
), mock.patch.object(
scrape_costco,
"load_costco_browser_headers",
return_value={
"costco-x-authorization": "Bearer header.payload.signature",
"costco-x-wcs-clientId": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client-identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco, "build_session", return_value=object() scrape_costco, "build_session", return_value=object()
), mock.patch.object( ), mock.patch.object(
scrape_costco, scrape_costco,
@@ -427,6 +447,7 @@ class CostcoPipelineTests(unittest.TestCase):
document_sub_type="all", document_sub_type="all",
window_days=92, window_days=92,
months_back=3, months_back=3,
firefox_profile_dir=None,
) )
metadata_path = outdir / "raw" / "summary_requests.json" metadata_path = outdir / "raw" / "summary_requests.json"

301
tests/test_purchases.py Normal file
View File

@@ -0,0 +1,301 @@
import csv
import tempfile
import unittest
from pathlib import Path
import build_purchases
import enrich_costco
class PurchaseLogTests(unittest.TestCase):
def test_derive_metrics_prefers_picked_weight_and_pack_count(self):
metrics = build_purchases.derive_metrics(
{
"line_total": "4.00",
"qty": "1",
"pack_qty": "4",
"size_value": "",
"size_unit": "",
"picked_weight": "2",
"price_per_each": "",
"price_per_lb": "",
"price_per_oz": "",
}
)
self.assertEqual("4", metrics["price_per_each"])
self.assertEqual("1", metrics["price_per_count"])
self.assertEqual("2", metrics["price_per_lb"])
self.assertEqual("0.125", metrics["price_per_oz"])
self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"])
def test_build_purchase_rows_maps_canonical_ids(self):
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "FRESH BANANA",
"item_name_norm": "BANANA",
"image_url": "https://example.test/banana.jpg",
"retailer_item_id": "100",
"upc": "4011",
"qty": "1",
"unit": "LB",
"line_total": "1.29",
"unit_price": "1.29",
"measure_type": "weight",
"price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
costco_row = {field: "" for field in fieldnames}
costco_row.update(
{
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_item_key": "costco:c1:1",
"order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA",
"retailer_item_id": "30669",
"qty": "1",
"unit": "E",
"line_total": "2.98",
"unit_price": "2.98",
"size_value": "3",
"size_unit": "lb",
"measure_type": "weight",
"price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
giant_orders = [
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
]
costco_orders = [
{
"order_id": "c1",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
}
]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row],
[costco_row],
giant_orders,
costco_orders,
[],
)
self.assertEqual(2, len(rows))
self.assertTrue(all(row["canonical_product_id"] for row in rows))
self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows})
self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"])
def test_main_writes_purchase_and_example_csvs(self):
with tempfile.TemporaryDirectory() as tmpdir:
giant_items = Path(tmpdir) / "giant_items.csv"
costco_items = Path(tmpdir) / "costco_items.csv"
giant_orders = Path(tmpdir) / "giant_orders.csv"
costco_orders = Path(tmpdir) / "costco_orders.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
purchases_csv = Path(tmpdir) / "combined" / "purchases.csv"
examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv"
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "FRESH BANANA",
"item_name_norm": "BANANA",
"retailer_item_id": "100",
"upc": "4011",
"qty": "1",
"unit": "LB",
"line_total": "1.29",
"unit_price": "1.29",
"measure_type": "weight",
"price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
costco_row = {field: "" for field in fieldnames}
costco_row.update(
{
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_item_key": "costco:c1:1",
"order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA",
"retailer_item_id": "30669",
"qty": "1",
"unit": "E",
"line_total": "2.98",
"unit_price": "2.98",
"size_value": "3",
"size_unit": "lb",
"measure_type": "weight",
"price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
for path, source_rows in [
(giant_items, [giant_row]),
(costco_items, [costco_row]),
]:
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(source_rows)
order_fields = ["order_id", "store_name", "store_number", "store_city", "store_state"]
for path, source_rows in [
(
giant_orders,
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
),
(
costco_orders,
[
{
"order_id": "c1",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
}
],
),
]:
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=order_fields)
writer.writeheader()
writer.writerows(source_rows)
build_purchases.main.callback(
giant_items_enriched_csv=str(giant_items),
costco_items_enriched_csv=str(costco_items),
giant_orders_csv=str(giant_orders),
costco_orders_csv=str(costco_orders),
resolutions_csv=str(resolutions_csv),
catalog_csv=str(catalog_csv),
links_csv=str(links_csv),
output_csv=str(purchases_csv),
examples_csv=str(examples_csv),
)
self.assertTrue(purchases_csv.exists())
self.assertTrue(examples_csv.exists())
with purchases_csv.open(newline="", encoding="utf-8") as handle:
purchase_rows = list(csv.DictReader(handle))
with examples_csv.open(newline="", encoding="utf-8") as handle:
example_rows = list(csv.DictReader(handle))
self.assertEqual(2, len(purchase_rows))
self.assertEqual(1, len(example_rows))
def test_build_purchase_rows_applies_manual_resolution(self):
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "SB BAGGED ICE 20LB",
"item_name_norm": "BAGGED ICE",
"retailer_item_id": "100",
"upc": "",
"qty": "1",
"unit": "EA",
"line_total": "3.50",
"unit_price": "3.50",
"measure_type": "each",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
observed_rows, _canonical_rows, _link_rows, _observed_id_by_key, _canonical_by_observed = (
build_purchases.build_link_state([giant_row])
)
observed_product_id = observed_rows[0]["observed_product_id"]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row],
[],
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
[],
[
{
"observed_product_id": observed_product_id,
"canonical_product_id": "gcan_manual_ice",
"resolution_action": "create",
"status": "approved",
"resolution_notes": "manual ice merge",
"reviewed_at": "2026-03-16",
}
],
)
self.assertEqual("gcan_manual_ice", rows[0]["canonical_product_id"])
self.assertEqual("approved", rows[0]["review_status"])
self.assertEqual("create", rows[0]["resolution_action"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,409 @@
import csv
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from click.testing import CliRunner
import review_products
class ReviewWorkflowTests(unittest.TestCase):
def test_build_review_queue_groups_unresolved_purchases(self):
queue_rows = review_products.build_review_queue(
[
{
"observed_product_id": "gobs_1",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"upc": "",
"line_total": "3.50",
},
{
"observed_product_id": "gobs_1",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAG ICE CUBED 10LB",
"normalized_item_name": "BAG ICE",
"upc": "",
"line_total": "2.50",
},
],
[],
)
self.assertEqual(1, len(queue_rows))
self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"])
self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"])
def test_build_canonical_suggestions_prefers_upc_then_name(self):
suggestions = review_products.build_canonical_suggestions(
[
{
"normalized_item_name": "MIXED PEPPER",
"upc": "12345",
}
],
[
{
"canonical_product_id": "gcan_1",
"canonical_name": "MIXED PEPPER",
"upc": "",
},
{
"canonical_product_id": "gcan_2",
"canonical_name": "MIXED PEPPER 6 PACK",
"upc": "12345",
},
],
)
self.assertEqual("gcan_2", suggestions[0]["canonical_product_id"])
self.assertEqual("exact upc", suggestions[0]["reason"])
self.assertEqual("gcan_1", suggestions[1]["canonical_product_id"])
def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
purchase_fields = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
]
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=purchase_fields)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "https://example.test/mixed-pepper.jpg",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "produce",
"product_type": "pepper",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
runner = CliRunner()
result = runner.invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Review 1/1: Resolve observed_product MIXED PEPPER to canonical_name [__]?", result.output)
self.assertIn("2 matched items:", result.output)
self.assertIn("[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", result.output)
first_item = result.output.index("[1] 2026-03-14 | 7.49")
second_item = result.output.index("[2] 2026-03-12 | 6.99")
self.assertLess(first_item, second_item)
self.assertIn("https://example.test/mixed-pepper.jpg", result.output)
self.assertIn("1 canonical suggestions found:", result.output)
self.assertIn("[1] MIXED PEPPER", result.output)
self.assertIn("\x1b[", result.output)
def test_review_products_no_suggestions_is_informational(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_product_id": "gobs_ice",
"canonical_product_id": "",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("no canonical_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "",
"product_type": "",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--limit",
"1",
],
input="l\n1\ny\nlinked by test\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Select the canonical_name to associate 2 items with:", result.output)
self.assertIn('[1] MIXED PEPPER | gcan_mix', result.output)
self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output)
self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual("gcan_mix", rows[0]["canonical_product_id"])
self.assertEqual("link", rows[0]["resolution_action"])
def test_review_products_creates_canonical_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"observed_product_id",
"canonical_product_id",
"retailer",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
"order_id",
"line_no",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-15",
"observed_product_id": "gobs_ice",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
"order_id": "g1",
"line_no": "1",
}
)
with mock.patch.object(
review_products.click,
"prompt",
side_effect=["n", "ICE", "frozen", "ice", "manual merge", "q"],
):
review_products.main.callback(
purchases_csv=str(purchases_csv),
queue_csv=str(queue_csv),
resolutions_csv=str(resolutions_csv),
catalog_csv=str(catalog_csv),
limit=1,
refresh_only=False,
)
self.assertTrue(queue_csv.exists())
self.assertTrue(resolutions_csv.exists())
self.assertTrue(catalog_csv.exists())
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
resolution_rows = list(csv.DictReader(handle))
with catalog_csv.open(newline="", encoding="utf-8") as handle:
catalog_rows = list(csv.DictReader(handle))
self.assertEqual("create", resolution_rows[0]["resolution_action"])
self.assertEqual("approved", resolution_rows[0]["status"])
self.assertEqual("ICE", catalog_rows[0]["canonical_name"])
if __name__ == "__main__":
unittest.main()