Compare commits

1 Commits

Author SHA1 Message Date
bf2934f487 added giant_output/ to .gitignore 2026-03-15 14:28:46 -04:00
29 changed files with 591 additions and 4672 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ env/
# --- project private data ---
/private/
giant_output/
# --- django ---
db.sqlite3

103
README.md
View File

@@ -1,103 +0,0 @@
# scrape-giant
Small grocery-history pipeline for Giant receipts.
The project currently does four things:
1. scrape Giant in-store order history from an active Firefox session
2. enrich raw line items into a deterministic `items_enriched.csv`
3. aggregate retailer-facing observed products and build a manual review queue
4. create a first-pass canonical product layer plus conservative auto-links
The work so far is Giant-specific on the ingest side and intentionally simple on
the shared product-model side.
## Current flow
Run the commands from the repo root with the project venv active, or call them
directly through `./venv/bin/python`.
```bash
./venv/bin/python scraper.py
./venv/bin/python enrich_giant.py
./venv/bin/python build_observed_products.py
./venv/bin/python build_review_queue.py
./venv/bin/python build_canonical_layer.py
```
## Inputs
- Firefox cookies for `giantfood.com`
- `GIANT_USER_ID` and `GIANT_LOYALTY_NUMBER` in `.env`, shell env, or prompts
- Giant raw order payloads in `giant_output/raw/`
## Outputs
Current generated files live under `giant_output/`:
- `orders.csv`: flattened visit/order rows from the Giant history API
- `items.csv`: flattened raw line items from fetched order detail payloads
- `items_enriched.csv`: deterministic parsed/enriched line items
- `products_observed.csv`: retailer-facing observed product groups
- `review_queue.csv`: products needing manual review
- `products_canonical.csv`: shared canonical product rows
- `product_links.csv`: observed-to-canonical links
Raw json remains the source of truth:
- `giant_output/raw/history.json`
- `giant_output/raw/<order_id>.json`
## Scripts
- `scraper.py`: fetches Giant history/detail payloads and updates `orders.csv` and `items.csv`
- `enrich_giant.py`: reads raw Giant order json and writes `items_enriched.csv`
- `build_observed_products.py`: groups enriched rows into `products_observed.csv`
- `build_review_queue.py`: generates `review_queue.csv` and preserves review status on reruns
- `build_canonical_layer.py`: builds `products_canonical.csv` and `product_links.csv`
## Notes on the current model
- Observed products are retailer-specific: Giant, Costco.
- Canonical products are the first cross-retailer layer.
- Auto-linking is conservative:
exact UPC first, then exact normalized name plus exact size/unit context, then
exact normalized name when there is no size context to conflict.
- Fee rows are excluded from auto-linking.
- Unknown values are left blank instead of guessed.
## Verification
Run the test suite with:
```bash
./venv/bin/python -m unittest discover -s tests
```
Useful one-off rebuilds:
```bash
./venv/bin/python enrich_giant.py
./venv/bin/python build_observed_products.py
./venv/bin/python build_review_queue.py
./venv/bin/python build_canonical_layer.py
```
## Project docs
- `pm/tasks.org`: task log and evidence
- `pm/data-model.org`: file layout and schema decisions
## Status
Completed through `t1.7`:
- Giant receipt fetch CLI
- data model and file layout
- Giant parser/enricher
- observed products
- review queue
- canonical layer scaffold
- conservative auto-link rules
Next planned task is `t1.8`: add a Costco raw ingest path.

View File

@@ -1,24 +0,0 @@
# agent rules
## priorities
- optimize for simplicity, boringness, and long-term maintainability
- prefer minimal diffs; avoid refactors unless required for the active task
## tech stack
- python; pandas or polars
- file storage: json and csv, no sqlite or databases
- assume local virtual env is available and accessible
- do not add new dependencies unless explicitly approved; if unavoidable, document justification in the active task notes
## workflow
- prefer direct argv commands (no bash -lc / compound shell chains) unless necessary
- work on ONE task at a time unless explicitly instructed otherwise
- at the start of work, state the task id you are executing
- do not start work unless a task id is specified; if missing, choose the earliest unchecked task and say so
- propose incremental steps
- always include basic tests for core logic
- when you complete a task:
- mark it [x] in pm/tasks.md
- fill in evidence with commit hash + commands run
- never mark complete unless acceptance criteria are met
- include date and time (HH:MM)

View File

@@ -1,129 +0,0 @@
import configparser
import os
import shutil
import sqlite3
import tempfile
from pathlib import Path
import browser_cookie3
def find_firefox_profile_dir():
profiles_ini = firefox_profiles_root() / "profiles.ini"
parser = configparser.RawConfigParser()
if not profiles_ini.exists():
raise FileNotFoundError(f"Firefox profiles.ini not found at {profiles_ini}")
parser.read(profiles_ini, encoding="utf-8")
profiles = []
for section in parser.sections():
if not section.startswith("Profile"):
continue
path_value = parser.get(section, "Path", fallback="")
if not path_value:
continue
is_relative = parser.getboolean(section, "IsRelative", fallback=True)
profile_path = (
profiles_ini.parent / path_value if is_relative else Path(path_value)
)
profiles.append(
(
parser.getboolean(section, "Default", fallback=False),
profile_path,
)
)
if not profiles:
raise FileNotFoundError("No Firefox profiles found in profiles.ini")
profiles.sort(key=lambda item: (not item[0], str(item[1])))
return profiles[0][1]
def firefox_profiles_root():
if os.name == "nt":
appdata = os.getenv("APPDATA", "").strip()
if not appdata:
raise FileNotFoundError("APPDATA is not set")
return Path(appdata) / "Mozilla" / "Firefox"
return Path.home() / ".mozilla" / "firefox"
def load_firefox_cookies(domain_name, profile_dir):
cookie_file = Path(profile_dir) / "cookies.sqlite"
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
def read_firefox_local_storage(profile_dir, origin_filter):
storage_root = profile_dir / "storage" / "default"
if not storage_root.exists():
return {}
for ls_path in storage_root.glob("*/ls/data.sqlite"):
origin = decode_firefox_origin(ls_path.parents[1].name)
if origin_filter.lower() not in origin.lower():
continue
return {
stringify_sql_value(row[0]): stringify_sql_value(row[1])
for row in query_sqlite(ls_path, "SELECT key, value FROM data")
}
return {}
def read_firefox_webapps_store(profile_dir, origin_filter):
webapps_path = profile_dir / "webappsstore.sqlite"
if not webapps_path.exists():
return {}
values = {}
for row in query_sqlite(
webapps_path,
"SELECT originKey, key, value FROM webappsstore2",
):
origin = stringify_sql_value(row[0])
if origin_filter.lower() not in origin.lower():
continue
values[stringify_sql_value(row[1])] = stringify_sql_value(row[2])
return values
def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path)
connection = None
cursor = None
try:
connection = sqlite3.connect(copied_path)
cursor = connection.cursor()
cursor.execute(query)
rows = cursor.fetchall()
return rows
except sqlite3.OperationalError:
return []
finally:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path):
fd, tmp = tempfile.mkstemp(suffix=".sqlite")
os.close(fd)
shutil.copyfile(path, tmp)
return Path(tmp)
def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0]
return origin.replace("+++", "://")
def stringify_sql_value(value):
if value is None:
return ""
if isinstance(value, bytes):
for encoding in ("utf-8", "utf-16-le", "utf-16"):
try:
return value.decode(encoding)
except UnicodeDecodeError:
continue
return value.decode("utf-8", errors="ignore")
return str(value)

View File

@@ -1,216 +0,0 @@
import click
from layer_helpers import read_csv_rows, representative_value, stable_id, write_csv_rows
CANONICAL_FIELDS = [
"canonical_product_id",
"canonical_name",
"product_type",
"brand",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"normalized_quantity",
"normalized_quantity_unit",
"notes",
"created_at",
"updated_at",
]
LINK_FIELDS = [
"observed_product_id",
"canonical_product_id",
"link_method",
"link_confidence",
"review_status",
"reviewed_by",
"reviewed_at",
"link_notes",
]
def to_float(value):
try:
return float(value)
except (TypeError, ValueError):
return None
def normalized_quantity(row):
size_value = to_float(row.get("representative_size_value"))
pack_qty = to_float(row.get("representative_pack_qty")) or 1.0
size_unit = row.get("representative_size_unit", "")
measure_type = row.get("representative_measure_type", "")
if size_value is not None and size_unit:
return format(size_value * pack_qty, "g"), size_unit
if row.get("representative_pack_qty") and measure_type == "count":
return row["representative_pack_qty"], "count"
if measure_type == "each":
return "1", "each"
return "", ""
def auto_link_rule(observed_row):
if (
observed_row.get("is_fee") == "true"
or observed_row.get("is_discount_line") == "true"
or observed_row.get("is_coupon_line") == "true"
):
return "", "", ""
if observed_row.get("representative_upc"):
return (
"exact_upc",
f"upc={observed_row['representative_upc']}",
"high",
)
if (
observed_row.get("representative_name_norm")
and observed_row.get("representative_size_value")
and observed_row.get("representative_size_unit")
):
return (
"exact_name_size",
"|".join(
[
f"name={observed_row['representative_name_norm']}",
f"size={observed_row['representative_size_value']}",
f"unit={observed_row['representative_size_unit']}",
f"pack={observed_row['representative_pack_qty']}",
f"measure={observed_row['representative_measure_type']}",
]
),
"high",
)
if (
observed_row.get("representative_name_norm")
and not observed_row.get("representative_size_value")
and not observed_row.get("representative_size_unit")
and not observed_row.get("representative_pack_qty")
):
return (
"exact_name",
"|".join(
[
f"name={observed_row['representative_name_norm']}",
f"measure={observed_row['representative_measure_type']}",
]
),
"medium",
)
return "", "", ""
def canonical_row_for_group(canonical_product_id, group_rows, link_method):
quantity_value, quantity_unit = normalized_quantity(
{
"representative_size_value": representative_value(
group_rows, "representative_size_value"
),
"representative_size_unit": representative_value(
group_rows, "representative_size_unit"
),
"representative_pack_qty": representative_value(
group_rows, "representative_pack_qty"
),
"representative_measure_type": representative_value(
group_rows, "representative_measure_type"
),
}
)
return {
"canonical_product_id": canonical_product_id,
"canonical_name": representative_value(group_rows, "representative_name_norm"),
"product_type": "",
"brand": representative_value(group_rows, "representative_brand"),
"variant": representative_value(group_rows, "representative_variant"),
"size_value": representative_value(group_rows, "representative_size_value"),
"size_unit": representative_value(group_rows, "representative_size_unit"),
"pack_qty": representative_value(group_rows, "representative_pack_qty"),
"measure_type": representative_value(group_rows, "representative_measure_type"),
"normalized_quantity": quantity_value,
"normalized_quantity_unit": quantity_unit,
"notes": f"auto-linked via {link_method}",
"created_at": "",
"updated_at": "",
}
def build_canonical_layer(observed_rows):
canonical_rows = []
link_rows = []
groups = {}
for observed_row in sorted(observed_rows, key=lambda row: row["observed_product_id"]):
link_method, group_key, confidence = auto_link_rule(observed_row)
if not group_key:
continue
canonical_product_id = stable_id("gcan", f"{link_method}|{group_key}")
groups.setdefault(canonical_product_id, {"method": link_method, "rows": []})
groups[canonical_product_id]["rows"].append(observed_row)
link_rows.append(
{
"observed_product_id": observed_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"link_method": link_method,
"link_confidence": confidence,
"review_status": "",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "",
}
)
for canonical_product_id, group in sorted(groups.items()):
canonical_rows.append(
canonical_row_for_group(
canonical_product_id, group["rows"], group["method"]
)
)
return canonical_rows, link_rows
@click.command()
@click.option(
"--observed-csv",
default="giant_output/products_observed.csv",
show_default=True,
help="Path to observed product rows.",
)
@click.option(
"--canonical-csv",
default="giant_output/products_canonical.csv",
show_default=True,
help="Path to canonical product output.",
)
@click.option(
"--links-csv",
default="giant_output/product_links.csv",
show_default=True,
help="Path to observed-to-canonical link output.",
)
def main(observed_csv, canonical_csv, links_csv):
observed_rows = read_csv_rows(observed_csv)
canonical_rows, link_rows = build_canonical_layer(observed_rows)
write_csv_rows(canonical_csv, canonical_rows, CANONICAL_FIELDS)
write_csv_rows(links_csv, link_rows, LINK_FIELDS)
click.echo(
f"wrote {len(canonical_rows)} canonical rows to {canonical_csv} and "
f"{len(link_rows)} links to {links_csv}"
)
if __name__ == "__main__":
main()

View File

@@ -1,172 +0,0 @@
from collections import defaultdict
import click
from layer_helpers import (
compact_join,
distinct_values,
first_nonblank,
read_csv_rows,
representative_value,
stable_id,
write_csv_rows,
)
OUTPUT_FIELDS = [
"observed_product_id",
"retailer",
"observed_key",
"representative_retailer_item_id",
"representative_upc",
"representative_item_name",
"representative_name_norm",
"representative_brand",
"representative_variant",
"representative_size_value",
"representative_size_unit",
"representative_pack_qty",
"representative_measure_type",
"representative_image_url",
"is_store_brand",
"is_fee",
"is_discount_line",
"is_coupon_line",
"first_seen_date",
"last_seen_date",
"times_seen",
"example_order_id",
"example_item_name",
"raw_name_examples",
"normalized_name_examples",
"example_prices",
"distinct_item_names_count",
"distinct_retailer_item_ids_count",
"distinct_upcs_count",
]
def build_observed_key(row):
if row.get("upc"):
return "|".join(
[
row["retailer"],
f"upc={row['upc']}",
f"name={row['item_name_norm']}",
]
)
if row.get("retailer_item_id"):
return "|".join(
[
row["retailer"],
f"retailer_item_id={row['retailer_item_id']}",
f"name={row['item_name_norm']}",
f"discount={row.get('is_discount_line', 'false')}",
f"coupon={row.get('is_coupon_line', 'false')}",
]
)
return "|".join(
[
row["retailer"],
f"name={row['item_name_norm']}",
f"size={row['size_value']}",
f"unit={row['size_unit']}",
f"pack={row['pack_qty']}",
f"measure={row['measure_type']}",
f"store_brand={row['is_store_brand']}",
f"fee={row['is_fee']}",
]
)
def build_observed_products(rows):
grouped = defaultdict(list)
for row in rows:
grouped[build_observed_key(row)].append(row)
observed_rows = []
for observed_key, group_rows in sorted(grouped.items()):
ordered = sorted(
group_rows,
key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])),
)
observed_rows.append(
{
"observed_product_id": stable_id("gobs", observed_key),
"retailer": ordered[0]["retailer"],
"observed_key": observed_key,
"representative_retailer_item_id": representative_value(
ordered, "retailer_item_id"
),
"representative_upc": representative_value(ordered, "upc"),
"representative_item_name": representative_value(ordered, "item_name"),
"representative_name_norm": representative_value(
ordered, "item_name_norm"
),
"representative_brand": representative_value(ordered, "brand_guess"),
"representative_variant": representative_value(ordered, "variant"),
"representative_size_value": representative_value(ordered, "size_value"),
"representative_size_unit": representative_value(ordered, "size_unit"),
"representative_pack_qty": representative_value(ordered, "pack_qty"),
"representative_measure_type": representative_value(
ordered, "measure_type"
),
"representative_image_url": first_nonblank(ordered, "image_url"),
"is_store_brand": representative_value(ordered, "is_store_brand"),
"is_fee": representative_value(ordered, "is_fee"),
"is_discount_line": representative_value(
ordered, "is_discount_line"
),
"is_coupon_line": representative_value(ordered, "is_coupon_line"),
"first_seen_date": ordered[0]["order_date"],
"last_seen_date": ordered[-1]["order_date"],
"times_seen": str(len(ordered)),
"example_order_id": ordered[0]["order_id"],
"example_item_name": ordered[0]["item_name"],
"raw_name_examples": compact_join(
distinct_values(ordered, "item_name"), limit=4
),
"normalized_name_examples": compact_join(
distinct_values(ordered, "item_name_norm"), limit=4
),
"example_prices": compact_join(
distinct_values(ordered, "line_total"), limit=4
),
"distinct_item_names_count": str(
len(distinct_values(ordered, "item_name"))
),
"distinct_retailer_item_ids_count": str(
len(distinct_values(ordered, "retailer_item_id"))
),
"distinct_upcs_count": str(len(distinct_values(ordered, "upc"))),
}
)
observed_rows.sort(key=lambda row: row["observed_product_id"])
return observed_rows
@click.command()
@click.option(
"--items-enriched-csv",
default="giant_output/items_enriched.csv",
show_default=True,
help="Path to enriched Giant item rows.",
)
@click.option(
"--output-csv",
default="giant_output/products_observed.csv",
show_default=True,
help="Path to observed product output.",
)
def main(items_enriched_csv, output_csv):
rows = read_csv_rows(items_enriched_csv)
observed_rows = build_observed_products(rows)
write_csv_rows(output_csv, observed_rows, OUTPUT_FIELDS)
click.echo(f"wrote {len(observed_rows)} rows to {output_csv}")
if __name__ == "__main__":
main()

View File

@@ -1,175 +0,0 @@
from collections import defaultdict
from datetime import date
import click
from layer_helpers import compact_join, distinct_values, read_csv_rows, stable_id, write_csv_rows
OUTPUT_FIELDS = [
"review_id",
"queue_type",
"retailer",
"observed_product_id",
"canonical_product_id",
"reason_code",
"priority",
"raw_item_names",
"normalized_names",
"upc",
"image_url",
"example_prices",
"seen_count",
"status",
"resolution_notes",
"created_at",
"updated_at",
]
def existing_review_state(path):
try:
rows = read_csv_rows(path)
except FileNotFoundError:
return {}
return {row["review_id"]: row for row in rows}
def review_reasons(observed_row):
reasons = []
if (
observed_row["is_fee"] == "true"
or observed_row.get("is_discount_line") == "true"
or observed_row.get("is_coupon_line") == "true"
):
return reasons
if observed_row["distinct_upcs_count"] not in {"", "0", "1"}:
reasons.append(("multiple_upcs", "high"))
if observed_row["distinct_item_names_count"] not in {"", "0", "1"}:
reasons.append(("multiple_raw_names", "medium"))
if not observed_row["representative_image_url"]:
reasons.append(("missing_image", "medium"))
if not observed_row["representative_upc"]:
reasons.append(("missing_upc", "high"))
if not observed_row["representative_name_norm"]:
reasons.append(("missing_normalized_name", "high"))
return reasons
def build_review_queue(observed_rows, item_rows, existing_rows, today_text):
by_observed = defaultdict(list)
for row in item_rows:
observed_id = row.get("observed_product_id", "")
if observed_id:
by_observed[observed_id].append(row)
queue_rows = []
for observed_row in observed_rows:
reasons = review_reasons(observed_row)
if not reasons:
continue
related_items = by_observed.get(observed_row["observed_product_id"], [])
raw_names = compact_join(distinct_values(related_items, "item_name"), limit=5)
norm_names = compact_join(
distinct_values(related_items, "item_name_norm"), limit=5
)
example_prices = compact_join(
distinct_values(related_items, "line_total"), limit=5
)
for reason_code, priority in reasons:
review_id = stable_id(
"rvw",
f"{observed_row['observed_product_id']}|{reason_code}",
)
prior = existing_rows.get(review_id, {})
queue_rows.append(
{
"review_id": review_id,
"queue_type": "observed_product",
"retailer": observed_row["retailer"],
"observed_product_id": observed_row["observed_product_id"],
"canonical_product_id": prior.get("canonical_product_id", ""),
"reason_code": reason_code,
"priority": priority,
"raw_item_names": raw_names,
"normalized_names": norm_names,
"upc": observed_row["representative_upc"],
"image_url": observed_row["representative_image_url"],
"example_prices": example_prices,
"seen_count": observed_row["times_seen"],
"status": prior.get("status", "pending"),
"resolution_notes": prior.get("resolution_notes", ""),
"created_at": prior.get("created_at", today_text),
"updated_at": today_text,
}
)
queue_rows.sort(key=lambda row: (row["priority"], row["reason_code"], row["review_id"]))
return queue_rows
def attach_observed_ids(item_rows, observed_rows):
observed_by_key = {row["observed_key"]: row["observed_product_id"] for row in observed_rows}
attached = []
for row in item_rows:
observed_key = "|".join(
[
row["retailer"],
f"upc={row['upc']}",
f"name={row['item_name_norm']}",
]
) if row.get("upc") else "|".join(
[
row["retailer"],
f"retailer_item_id={row.get('retailer_item_id', '')}",
f"name={row['item_name_norm']}",
f"size={row['size_value']}",
f"unit={row['size_unit']}",
f"pack={row['pack_qty']}",
f"measure={row['measure_type']}",
f"store_brand={row['is_store_brand']}",
f"fee={row['is_fee']}",
f"discount={row.get('is_discount_line', 'false')}",
f"coupon={row.get('is_coupon_line', 'false')}",
]
)
enriched = dict(row)
enriched["observed_product_id"] = observed_by_key.get(observed_key, "")
attached.append(enriched)
return attached
@click.command()
@click.option(
"--observed-csv",
default="giant_output/products_observed.csv",
show_default=True,
help="Path to observed product rows.",
)
@click.option(
"--items-enriched-csv",
default="giant_output/items_enriched.csv",
show_default=True,
help="Path to enriched Giant item rows.",
)
@click.option(
"--output-csv",
default="giant_output/review_queue.csv",
show_default=True,
help="Path to review queue output.",
)
def main(observed_csv, items_enriched_csv, output_csv):
observed_rows = read_csv_rows(observed_csv)
item_rows = read_csv_rows(items_enriched_csv)
item_rows = attach_observed_ids(item_rows, observed_rows)
existing_rows = existing_review_state(output_csv)
today_text = str(date.today())
queue_rows = build_review_queue(observed_rows, item_rows, existing_rows, today_text)
write_csv_rows(output_csv, queue_rows, OUTPUT_FIELDS)
click.echo(f"wrote {len(queue_rows)} rows to {output_csv}")
if __name__ == "__main__":
main()

View File

@@ -1,271 +0,0 @@
import csv
import json
import re
from pathlib import Path
import click
from enrich_giant import (
OUTPUT_FIELDS,
format_decimal,
normalize_number,
normalize_unit,
normalize_whitespace,
singularize_tokens,
to_decimal,
)
PARSER_VERSION = "costco-enrich-v1"
RETAILER = "costco"
DEFAULT_INPUT_DIR = Path("costco_output/raw")
DEFAULT_OUTPUT_CSV = Path("costco_output/items_enriched.csv")
CODE_TOKEN_RE = re.compile(
r"\b(?:SL\d+|T\d+H\d+|P\d+(?:/\d+)?|W\d+T\d+H\d+|FY\d+|CSPC#|C\d+T\d+H\d+|EC\d+T\d+H\d+|\d+X\d+)\b"
)
PACK_FRACTION_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*/\s*(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT)\b")
HASH_SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)#\b")
PACK_DASH_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*-\s*PACK\b")
PACK_WORD_RE = re.compile(r"(?<![A-Z0-9])(\d+)\s*PACK\b")
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)\s*(OZ|LB|LBS|CT|KG|G)\b")
def clean_costco_name(name):
cleaned = normalize_whitespace(name).upper().replace('"', "")
cleaned = CODE_TOKEN_RE.sub(" ", cleaned)
cleaned = re.sub(r"\s*/\s*\d+(?:\.\d+)?\s*(KG|G)\b", " ", cleaned)
cleaned = normalize_whitespace(cleaned)
return cleaned
def combine_description(item):
return normalize_whitespace(
" ".join(
str(part).strip()
for part in [item.get("itemDescription01"), item.get("itemDescription02")]
if part
)
)
def parse_costco_size_and_pack(cleaned_name):
pack_qty = ""
size_value = ""
size_unit = ""
match = PACK_FRACTION_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
size_value = normalize_number(match.group(2))
size_unit = normalize_unit(match.group(3))
return size_value, size_unit, pack_qty
match = HASH_SIZE_RE.search(cleaned_name)
if match:
size_value = normalize_number(match.group(1))
size_unit = "lb"
match = PACK_DASH_RE.search(cleaned_name) or PACK_WORD_RE.search(cleaned_name)
if match:
pack_qty = normalize_number(match.group(1))
matches = list(SIZE_RE.finditer(cleaned_name))
if matches:
last = matches[-1]
unit = last.group(2)
size_value = normalize_number(last.group(1))
size_unit = "count" if unit == "CT" else normalize_unit(unit)
return size_value, size_unit, pack_qty
def normalize_costco_name(cleaned_name):
brand = ""
base = cleaned_name
if base.startswith("KS "):
brand = "KS"
base = normalize_whitespace(base[3:])
size_value, size_unit, pack_qty = parse_costco_size_and_pack(base)
if size_value and size_unit:
if pack_qty:
base = PACK_FRACTION_RE.sub(" ", base)
else:
base = SIZE_RE.sub(" ", base)
base = HASH_SIZE_RE.sub(" ", base)
base = PACK_DASH_RE.sub(" ", base)
base = PACK_WORD_RE.sub(" ", base)
base = normalize_whitespace(base)
tokens = []
for token in base.split():
if token in {"ORG"}:
continue
if token in {"PEANUT", "BUTTER"} and "JIF" in base:
continue
tokens.append(token)
base = singularize_tokens(" ".join(tokens))
return normalize_whitespace(base), brand, size_value, size_unit, pack_qty
def guess_measure_type(size_unit, pack_qty, is_discount_line):
if is_discount_line:
return "each"
if size_unit in {"lb", "oz", "g", "kg"}:
return "weight"
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
return "volume"
if size_unit == "count" or pack_qty:
return "count"
return "each"
def derive_costco_prices(item, measure_type, size_value, size_unit, pack_qty):
line_total = to_decimal(item.get("amount"))
qty = to_decimal(item.get("unit"))
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or 1
price_per_each = ""
price_per_lb = ""
price_per_oz = ""
if line_total is None:
return price_per_each, price_per_lb, price_per_oz
if measure_type in {"each", "count"} and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
if parsed_size not in (None, 0):
total_units = parsed_size * parsed_pack * (qty or 1)
if size_unit == "lb":
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / 16)
elif size_unit == "oz":
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * 16)
return price_per_each, price_per_lb, price_per_oz
def is_discount_item(item):
amount = to_decimal(item.get("amount")) or 0
unit = to_decimal(item.get("unit")) or 0
description = combine_description(item)
return amount < 0 or unit < 0 or description.startswith("/")
def parse_costco_item(order_id, order_date, raw_path, line_no, item):
raw_name = combine_description(item)
cleaned_name = clean_costco_name(raw_name)
item_name_norm, brand_guess, size_value, size_unit, pack_qty = normalize_costco_name(
cleaned_name
)
is_discount_line = is_discount_item(item)
is_coupon_line = "true" if raw_name.startswith("/") else "false"
measure_type = guess_measure_type(size_unit, pack_qty, is_discount_line)
price_per_each, price_per_lb, price_per_oz = derive_costco_prices(
item, measure_type, size_value, size_unit, pack_qty
)
return {
"retailer": RETAILER,
"order_id": str(order_id),
"line_no": str(line_no),
"observed_item_key": f"{RETAILER}:{order_id}:{line_no}",
"order_date": normalize_whitespace(order_date),
"retailer_item_id": str(item.get("itemNumber", "")),
"pod_id": "",
"item_name": raw_name,
"upc": "",
"category_id": str(item.get("itemDepartmentNumber", "")),
"category": str(item.get("transDepartmentNumber", "")),
"qty": str(item.get("unit", "")),
"unit": str(item.get("itemIdentifier", "")),
"unit_price": str(item.get("itemUnitPriceAmount", "")),
"line_total": str(item.get("amount", "")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": str(item.get("amount", "")) if is_discount_line else "",
"coupon_price": "",
"image_url": "",
"raw_order_path": raw_path.as_posix(),
"item_name_norm": item_name_norm,
"brand_guess": brand_guess,
"variant": "",
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
"measure_type": measure_type,
"is_store_brand": "true" if brand_guess else "false",
"is_fee": "false",
"is_discount_line": "true" if is_discount_line else "false",
"is_coupon_line": is_coupon_line,
"price_per_each": price_per_each,
"price_per_lb": price_per_lb,
"price_per_oz": price_per_oz,
"parse_version": PARSER_VERSION,
"parse_notes": "",
}
def iter_costco_rows(raw_dir):
for path in discover_json_files(raw_dir):
if path.name == "summary.json":
continue
payload = json.loads(path.read_text(encoding="utf-8"))
receipts = payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
for receipt in receipts:
order_id = receipt["transactionBarcode"]
order_date = receipt.get("transactionDate", "")
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
yield parse_costco_item(order_id, order_date, path, line_no, item)
def discover_json_files(raw_dir):
raw_dir = Path(raw_dir)
candidates = sorted(raw_dir.glob("*.json"))
if candidates:
return candidates
if raw_dir.name == "raw" and raw_dir.parent.exists():
return sorted(raw_dir.parent.glob("*.json"))
return []
def build_items_enriched(raw_dir):
rows = list(iter_costco_rows(raw_dir))
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
return rows
def write_csv(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--input-dir",
default=str(DEFAULT_INPUT_DIR),
show_default=True,
help="Directory containing Costco raw order json files.",
)
@click.option(
"--output-csv",
default=str(DEFAULT_OUTPUT_CSV),
show_default=True,
help="CSV path for enriched Costco item rows.",
)
def main(input_dir, output_csv):
rows = build_items_enriched(Path(input_dir))
write_csv(Path(output_csv), rows)
click.echo(f"wrote {len(rows)} rows to {output_csv}")
if __name__ == "__main__":
main()

View File

@@ -1,455 +0,0 @@
import csv
import json
import re
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path
import click
PARSER_VERSION = "giant-enrich-v1"
RETAILER = "giant"
DEFAULT_INPUT_DIR = Path("giant_output/raw")
DEFAULT_OUTPUT_CSV = Path("giant_output/items_enriched.csv")
OUTPUT_FIELDS = [
"retailer",
"order_id",
"line_no",
"observed_item_key",
"order_date",
"retailer_item_id",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
"image_url",
"raw_order_path",
"item_name_norm",
"brand_guess",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"is_store_brand",
"is_fee",
"is_discount_line",
"is_coupon_line",
"price_per_each",
"price_per_lb",
"price_per_oz",
"parse_version",
"parse_notes",
]
STORE_BRAND_PREFIXES = {
"SB": "SB",
"NP": "NP",
}
DROP_TOKENS = {"FRESH"}
ABBREVIATIONS = {
"APPLE": "APPLE",
"APPLES": "APPLES",
"APLE": "APPLE",
"BASIL": "BASIL",
"BLK": "BLACK",
"BNLS": "BONELESS",
"BRWN": "BROWN",
"CARROTS": "CARROTS",
"CHDR": "CHEDDAR",
"CHICKEN": "CHICKEN",
"CHOC": "CHOCOLATE",
"CHS": "CHEESE",
"CHSE": "CHEESE",
"CHZ": "CHEESE",
"CILANTRO": "CILANTRO",
"CKI": "COOKIE",
"CRSHD": "CRUSHED",
"FLR": "FLOUR",
"FRSH": "FRESH",
"GALA": "GALA",
"GRAHM": "GRAHAM",
"HOT": "HOT",
"HRSRDSH": "HORSERADISH",
"IMP": "IMPORTED",
"IQF": "IQF",
"LENTILS": "LENTILS",
"LG": "LARGE",
"MLK": "MILK",
"MSTRD": "MUSTARD",
"ONION": "ONION",
"ORG": "ORGANIC",
"PEPPER": "PEPPER",
"PEPPERS": "PEPPERS",
"POT": "POTATO",
"POTATO": "POTATO",
"PPR": "PEPPER",
"RICOTTA": "RICOTTA",
"ROASTER": "ROASTER",
"ROTINI": "ROTINI",
"SCE": "SAUCE",
"SLC": "SLICED",
"SPINCH": "SPINACH",
"SPNC": "SPINACH",
"SPINACH": "SPINACH",
"SQZ": "SQUEEZE",
"SWT": "SWEET",
"THYME": "THYME",
"TOM": "TOMATO",
"TOMS": "TOMATOES",
"TRTL": "TORTILLA",
"VEG": "VEGETABLE",
"VINEGAR": "VINEGAR",
"WHT": "WHITE",
"WHOLE": "WHOLE",
"YLW": "YELLOW",
"YLWGLD": "YELLOW_GOLD",
}
FEE_PATTERNS = [
re.compile(r"\bBAG CHARGE\b"),
re.compile(r"\bDISC AT TOTAL\b"),
]
SIZE_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(OZ|Z|LB|LBS|ML|L|FZ|FL OZ|QT|PT|GAL|GA)\b")
PACK_RE = re.compile(r"(?<![A-Z0-9])(\d+(?:\.\d+)?)(?:\s*)(CT|PK|PKG|PACK)\b")
def to_decimal(value):
if value in ("", None):
return None
try:
return Decimal(str(value))
except (InvalidOperation, ValueError):
return None
def format_decimal(value, places=4):
if value is None:
return ""
quant = Decimal("1").scaleb(-places)
normalized = value.quantize(quant, rounding=ROUND_HALF_UP).normalize()
return format(normalized, "f")
def normalize_whitespace(value):
return " ".join(str(value or "").strip().split())
def clean_item_name(name):
cleaned = normalize_whitespace(name).upper()
cleaned = re.sub(r"^\+", "", cleaned)
cleaned = re.sub(r"^PLU#\d+\s*", "", cleaned)
cleaned = cleaned.replace("#", " ")
return normalize_whitespace(cleaned)
def extract_store_brand_prefix(cleaned_name):
for prefix, brand in STORE_BRAND_PREFIXES.items():
if cleaned_name == prefix or cleaned_name.startswith(f"{prefix} "):
return prefix, brand
return "", ""
def extract_image_url(item):
image = item.get("image")
if isinstance(image, dict):
for key in ["xlarge", "large", "medium", "small"]:
value = image.get(key)
if value:
return value
if isinstance(image, str):
return image
return ""
def parse_size_and_pack(cleaned_name):
size_value = ""
size_unit = ""
pack_qty = ""
size_matches = list(SIZE_RE.finditer(cleaned_name))
if size_matches:
match = size_matches[-1]
size_value = normalize_number(match.group(1))
size_unit = normalize_unit(match.group(2))
pack_matches = list(PACK_RE.finditer(cleaned_name))
if pack_matches:
match = pack_matches[-1]
pack_qty = normalize_number(match.group(1))
return size_value, size_unit, pack_qty
def normalize_number(value):
decimal = to_decimal(value)
if decimal is None:
return ""
return format(decimal.normalize(), "f")
def normalize_unit(unit):
collapsed = normalize_whitespace(unit).upper()
return {
"Z": "oz",
"OZ": "oz",
"FZ": "fl_oz",
"FL OZ": "fl_oz",
"LB": "lb",
"LBS": "lb",
"ML": "ml",
"L": "l",
"QT": "qt",
"PT": "pt",
"GAL": "gal",
"GA": "gal",
}.get(collapsed, collapsed.lower())
def strip_measure_tokens(cleaned_name):
without_sizes = SIZE_RE.sub(" ", cleaned_name)
without_measures = PACK_RE.sub(" ", without_sizes)
return normalize_whitespace(without_measures)
def expand_token(token):
return ABBREVIATIONS.get(token, token)
def normalize_item_name(cleaned_name):
prefix, _brand = extract_store_brand_prefix(cleaned_name)
base = cleaned_name
if prefix:
base = normalize_whitespace(base[len(prefix):])
base = strip_measure_tokens(base)
expanded_tokens = []
for token in base.split():
expanded = expand_token(token)
if expanded in DROP_TOKENS:
continue
expanded_tokens.append(expanded)
expanded = " ".join(token for token in expanded_tokens if token)
return singularize_tokens(normalize_whitespace(expanded))
def singularize_tokens(text):
singular_map = {
"APPLES": "APPLE",
"BANANAS": "BANANA",
"BERRIES": "BERRY",
"EGGS": "EGG",
"LEMONS": "LEMON",
"LIMES": "LIME",
"MANDARINS": "MANDARIN",
"PEPPERS": "PEPPER",
"STRAWBERRIES": "STRAWBERRY",
}
tokens = [singular_map.get(token, token) for token in text.split()]
return normalize_whitespace(" ".join(tokens))
def guess_measure_type(item, size_unit, pack_qty):
unit = normalize_whitespace(item.get("lbEachCd")).upper()
picked_weight = to_decimal(item.get("totalPickedWeight"))
qty = to_decimal(item.get("shipQy"))
if unit == "LB" or (picked_weight is not None and picked_weight > 0 and unit != "EA"):
return "weight"
if size_unit in {"lb", "oz"}:
return "weight"
if size_unit in {"ml", "l", "qt", "pt", "gal", "fl_oz"}:
return "volume"
if pack_qty:
return "count"
if unit == "EA" or (qty is not None and qty > 0):
return "each"
return ""
def is_fee_item(cleaned_name):
return any(pattern.search(cleaned_name) for pattern in FEE_PATTERNS)
def derive_prices(item, measure_type, size_value="", size_unit="", pack_qty=""):
qty = to_decimal(item.get("shipQy"))
line_total = to_decimal(item.get("groceryAmount"))
picked_weight = to_decimal(item.get("totalPickedWeight"))
parsed_size = to_decimal(size_value)
parsed_pack = to_decimal(pack_qty) or Decimal("1")
price_per_each = ""
price_per_lb = ""
price_per_oz = ""
if line_total is None:
return price_per_each, price_per_lb, price_per_oz
if measure_type == "each" and qty not in (None, Decimal("0")):
price_per_each = format_decimal(line_total / qty)
if measure_type == "count" and qty not in (None, Decimal("0")):
price_per_each = format_decimal(line_total / qty)
if measure_type == "weight" and picked_weight not in (None, Decimal("0")):
per_lb = line_total / picked_weight
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
return price_per_each, price_per_lb, price_per_oz
if measure_type == "weight" and parsed_size not in (None, Decimal("0")) and qty not in (None, Decimal("0")):
total_units = qty * parsed_pack * parsed_size
if size_unit == "lb":
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
elif size_unit == "oz":
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * Decimal("16"))
return price_per_each, price_per_lb, price_per_oz
def parse_item(order_id, order_date, raw_path, line_no, item):
cleaned_name = clean_item_name(item.get("itemName", ""))
size_value, size_unit, pack_qty = parse_size_and_pack(cleaned_name)
prefix, brand_guess = extract_store_brand_prefix(cleaned_name)
normalized_name = normalize_item_name(cleaned_name)
measure_type = guess_measure_type(item, size_unit, pack_qty)
price_per_each, price_per_lb, price_per_oz = derive_prices(
item,
measure_type,
size_value=size_value,
size_unit=size_unit,
pack_qty=pack_qty,
)
is_fee = is_fee_item(cleaned_name)
parse_notes = []
if prefix:
parse_notes.append(f"store_brand_prefix={prefix}")
if is_fee:
parse_notes.append("fee_item")
if size_value and not size_unit:
parse_notes.append("size_without_unit")
return {
"retailer": RETAILER,
"order_id": str(order_id),
"line_no": str(line_no),
"observed_item_key": f"{RETAILER}:{order_id}:{line_no}",
"order_date": normalize_whitespace(order_date),
"retailer_item_id": stringify(item.get("podId")),
"pod_id": stringify(item.get("podId")),
"item_name": stringify(item.get("itemName")),
"upc": stringify(item.get("primUpcCd")),
"category_id": stringify(item.get("categoryId")),
"category": stringify(item.get("categoryDesc")),
"qty": stringify(item.get("shipQy")),
"unit": stringify(item.get("lbEachCd")),
"unit_price": stringify(item.get("unitPrice")),
"line_total": stringify(item.get("groceryAmount")),
"picked_weight": stringify(item.get("totalPickedWeight")),
"mvp_savings": stringify(item.get("mvpSavings")),
"reward_savings": stringify(item.get("rewardSavings")),
"coupon_savings": stringify(item.get("couponSavings")),
"coupon_price": stringify(item.get("couponPrice")),
"image_url": extract_image_url(item),
"raw_order_path": raw_path.as_posix(),
"item_name_norm": normalized_name,
"brand_guess": brand_guess,
"variant": "",
"size_value": size_value,
"size_unit": size_unit,
"pack_qty": pack_qty,
"measure_type": measure_type,
"is_store_brand": "true" if bool(prefix) else "false",
"is_fee": "true" if is_fee else "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"price_per_each": price_per_each,
"price_per_lb": price_per_lb,
"price_per_oz": price_per_oz,
"parse_version": PARSER_VERSION,
"parse_notes": ";".join(parse_notes),
}
def stringify(value):
if value is None:
return ""
return str(value)
def iter_order_rows(raw_dir):
for path in sorted(raw_dir.glob("*.json")):
if path.name == "history.json":
continue
payload = json.loads(path.read_text(encoding="utf-8"))
order_id = payload.get("orderId", path.stem)
order_date = payload.get("orderDate", "")
for line_no, item in enumerate(payload.get("items", []), start=1):
yield parse_item(order_id, order_date, path, line_no, item)
def build_items_enriched(raw_dir):
rows = list(iter_order_rows(raw_dir))
rows.sort(key=lambda row: (row["order_date"], row["order_id"], int(row["line_no"])))
return rows
def write_csv(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=OUTPUT_FIELDS)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--input-dir",
default=str(DEFAULT_INPUT_DIR),
show_default=True,
help="Directory containing Giant raw order json files.",
)
@click.option(
"--output-csv",
default=str(DEFAULT_OUTPUT_CSV),
show_default=True,
help="CSV path for enriched Giant item rows.",
)
def main(input_dir, output_csv):
raw_dir = Path(input_dir)
output_path = Path(output_csv)
if not raw_dir.exists():
raise click.ClickException(f"input dir does not exist: {raw_dir}")
rows = build_items_enriched(raw_dir)
write_csv(output_path, rows)
click.echo(f"wrote {len(rows)} rows to {output_path}")
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
import csv
import hashlib
from collections import Counter
from pathlib import Path
def read_csv_rows(path):
path = Path(path)
with path.open(newline="", encoding="utf-8") as handle:
return list(csv.DictReader(handle))
def write_csv_rows(path, rows, fieldnames):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def stable_id(prefix, raw_key):
digest = hashlib.sha1(str(raw_key).encode("utf-8")).hexdigest()[:12]
return f"{prefix}_{digest}"
def first_nonblank(rows, field):
for row in rows:
value = row.get(field, "")
if value:
return value
return ""
def representative_value(rows, field):
values = [row.get(field, "") for row in rows if row.get(field, "")]
if not values:
return ""
counts = Counter(values)
return sorted(counts.items(), key=lambda item: (-item[1], item[0]))[0][0]
def distinct_values(rows, field):
return sorted({row.get(field, "") for row in rows if row.get(field, "")})
def compact_join(values, limit=3):
unique = []
seen = set()
for value in values:
if value and value not in seen:
seen.add(value)
unique.append(value)
return " | ".join(unique[:limit])

View File

@@ -1,309 +0,0 @@
* grocery data model and file layout
This document defines the shared file layout and stable CSV schemas for the
grocery pipeline. The goal is to keep retailer-specific ingest separate from
cross-retailer product modeling so Giant-specific quirks do not become the
system of record.
** design rules
- Raw retailer exports remain the source of truth.
- Retailer parsing is isolated to retailer-specific files and ids.
- Cross-retailer product layers begin only after retailer-specific enrichment.
- CSV schemas are stable and additive: new columns may be appended, but
existing columns should not be repurposed.
- Unknown values should be left blank rather than guessed.
** directory layout
Use one top-level data root:
#+begin_example
data/
giant/
raw/
history.json
orders/
<order_id>.json
orders.csv
items_raw.csv
items_enriched.csv
products_observed.csv
costco/
raw/
...
orders.csv
items_raw.csv
items_enriched.csv
products_observed.csv
shared/
products_canonical.csv
product_links.csv
review_queue.csv
#+end_example
** layer responsibilities
- `data/<retailer>/raw/`
Stores unmodified retailer payloads exactly as fetched.
- `data/<retailer>/orders.csv`
One row per retailer order or visit, flattened from raw order data.
- `data/<retailer>/items_raw.csv`
One row per retailer line item, preserving retailer-native values needed for
reruns and debugging.
- `data/<retailer>/items_enriched.csv`
Parsed retailer line items with normalized fields and derived guesses, still
retailer-specific.
- `data/<retailer>/products_observed.csv`
Distinct retailer-facing observed products aggregated from enriched items.
- `data/shared/products_canonical.csv`
Cross-retailer canonical product entities used for comparison.
- `data/shared/product_links.csv`
Links from retailer observed products to canonical products.
- `data/shared/review_queue.csv`
Human review queue for unresolved or low-confidence matching/parsing cases.
** retailer-specific versus shared
Retailer-specific:
- raw json payloads
- retailer order ids
- retailer line numbers
- retailer category ids and names
- retailer item names
- retailer image urls
- parsed guesses derived from one retailer feed
- observed products scoped to one retailer
Shared:
- canonical products
- observed-to-canonical links
- human review state for unresolved cases
- comparison-ready normalized quantity basis fields
Observed products are the boundary between retailer-specific parsing and
cross-retailer canonicalization. Nothing upstream of `products_observed.csv`
should require knowledge of another retailer.
** schema: `data/<retailer>/orders.csv`
One row per order or visit.
| column | meaning |
|-
| `retailer` | retailer slug such as `giant` |
| `order_id` | retailer order or visit id |
| `order_date` | order date in `YYYY-MM-DD` when available |
| `delivery_date` | fulfillment date in `YYYY-MM-DD` when available |
| `service_type` | retailer service type such as `INSTORE` |
| `order_total` | order total as provided by retailer |
| `payment_method` | retailer payment label |
| `total_item_count` | total line count or item count from retailer |
| `total_savings` | total savings as provided by retailer |
| `your_savings_total` | savings field from retailer when present |
| `coupons_discounts_total` | coupon/discount total from retailer |
| `store_name` | retailer store name |
| `store_number` | retailer store number |
| `store_address1` | street address |
| `store_city` | city |
| `store_state` | state or province |
| `store_zipcode` | postal code |
| `refund_order` | retailer refund flag |
| `ebt_order` | retailer EBT flag |
| `raw_history_path` | relative path to source history payload |
| `raw_order_path` | relative path to source order payload |
Primary key:
- (`retailer`, `order_id`)
** schema: `data/<retailer>/items_raw.csv`
One row per retailer line item.
| column | meaning |
|------------------+-----------------------------------------|
| `retailer` | retailer slug |
| `order_id` | retailer order id |
| `line_no` | stable line number within order export |
| `order_date` | copied from order when available |
| `retailer_item_id` | retailer-native item id when available |
| `pod_id` | retailer pod/item id |
| `item_name` | raw retailer item name |
| `upc` | retailer UPC or PLU value |
| `category_id` | retailer category id |
| `category` | retailer category description |
| `qty` | retailer quantity field |
| `unit` | retailer unit code such as `EA` or `LB` |
| `unit_price` | retailer unit price field |
| `line_total` | retailer extended price field |
| `picked_weight` | retailer picked weight field |
| `mvp_savings` | retailer savings field |
| `reward_savings` | retailer rewards savings field |
| `coupon_savings` | retailer coupon savings field |
| `coupon_price` | retailer coupon price field |
| `image_url` | raw retailer image url when present |
| `raw_order_path` | relative path to source order payload |
| `is_discount_line` | retailer adjustment or discount-line flag |
| `is_coupon_line` | coupon-like line flag when distinguishable |
Primary key:
- (`retailer`, `order_id`, `line_no`)
** schema: `data/<retailer>/items_enriched.csv`
One row per retailer line item after deterministic parsing. Preserve the raw
fields from `items_raw.csv` and add parsed fields.
| column | meaning |
|---------------------+-------------------------------------------------------------|
| `retailer` | retailer slug |
| `order_id` | retailer order id |
| `line_no` | line number within order |
| `observed_item_key` | stable row key, typically `<retailer>:<order_id>:<line_no>` |
| `retailer_item_id` | retailer-native item id |
| `item_name` | raw retailer item name |
| `item_name_norm` | normalized item name |
| `brand_guess` | parsed brand guess |
| `variant` | parsed variant text |
| `size_value` | parsed numeric size value |
| `size_unit` | parsed size unit such as `oz`, `lb`, `fl_oz` |
| `pack_qty` | parsed pack or count guess |
| `measure_type` | `each`, `weight`, `volume`, `count`, or blank |
| `is_store_brand` | store-brand guess |
| `is_fee` | fee or non-product flag |
| `is_discount_line` | discount or adjustment-line flag |
| `is_coupon_line` | coupon-like line flag |
| `price_per_each` | derived per-each price when supported |
| `price_per_lb` | derived per-pound price when supported |
| `price_per_oz` | derived per-ounce price when supported |
| `image_url` | best available retailer image url |
| `parse_version` | parser version string for reruns |
| `parse_notes` | optional non-fatal parser notes |
Primary key:
- (`retailer`, `order_id`, `line_no`)
** schema: `data/<retailer>/products_observed.csv`
One row per distinct retailer-facing observed product.
| column | meaning |
|-------------------------------+----------------------------------------------------------------|
| `observed_product_id` | stable observed product id |
| `retailer` | retailer slug |
| `observed_key` | deterministic grouping key used to create the observed product |
| `representative_retailer_item_id` | best representative retailer-native item id |
| `representative_upc` | best representative UPC/PLU |
| `representative_item_name` | representative raw retailer name |
| `representative_name_norm` | representative normalized name |
| `representative_brand` | representative brand guess |
| `representative_variant` | representative variant |
| `representative_size_value` | representative size value |
| `representative_size_unit` | representative size unit |
| `representative_pack_qty` | representative pack/count |
| `representative_measure_type` | representative measure type |
| `representative_image_url` | representative image url |
| `is_store_brand` | representative store-brand flag |
| `is_fee` | representative fee flag |
| `is_discount_line` | representative discount-line flag |
| `is_coupon_line` | representative coupon-line flag |
| `first_seen_date` | first order date seen |
| `last_seen_date` | last order date seen |
| `times_seen` | number of enriched item rows grouped here |
| `example_order_id` | one example retailer order id |
| `example_item_name` | one example raw item name |
| `distinct_retailer_item_ids_count` | count of distinct retailer-native item ids |
Primary key:
- (`observed_product_id`)
** schema: `data/shared/products_canonical.csv`
One row per cross-retailer canonical product.
| column | meaning |
|----------------------------+--------------------------------------------------|
| `canonical_product_id` | stable canonical product id |
| `canonical_name` | canonical human-readable name |
| `product_type` | broad class such as `apple`, `milk`, `trash_bag` |
| `brand` | canonical brand when applicable |
| `variant` | canonical variant |
| `size_value` | normalized size value |
| `size_unit` | normalized size unit |
| `pack_qty` | normalized pack/count |
| `measure_type` | normalized measure type |
| `normalized_quantity` | numeric comparison basis value |
| `normalized_quantity_unit` | basis unit such as `oz`, `lb`, `count` |
| `notes` | optional human notes |
| `created_at` | creation timestamp or date |
| `updated_at` | last update timestamp or date |
Primary key:
- (`canonical_product_id`)
** schema: `data/shared/product_links.csv`
One row per observed-to-canonical relationship.
| column | meaning |
|-
| `observed_product_id` | retailer observed product id |
| `canonical_product_id` | linked canonical product id |
| `link_method` | `manual`, `exact_upc`, `exact_name`, etc. |
| `link_confidence` | optional confidence label |
| `review_status` | `pending`, `approved`, `rejected`, or blank |
| `reviewed_by` | reviewer id or initials |
| `reviewed_at` | review timestamp or date |
| `link_notes` | optional notes |
Primary key:
- (`observed_product_id`, `canonical_product_id`)
** schema: `data/shared/review_queue.csv`
One row per issue needing human review.
| column | meaning |
|-
| `review_id` | stable review row id |
| `queue_type` | `observed_product`, `link_candidate`, `parse_issue` |
| `retailer` | retailer slug when applicable |
| `observed_product_id` | observed product id when applicable |
| `canonical_product_id` | candidate canonical id when applicable |
| `reason_code` | machine-readable review reason |
| `priority` | optional priority label |
| `raw_item_names` | compact list of example raw names |
| `normalized_names` | compact list of example normalized names |
| `upc` | example UPC/PLU |
| `image_url` | example image url |
| `example_prices` | compact list of example prices |
| `seen_count` | count of related rows |
| `status` | `pending`, `approved`, `rejected`, `deferred` |
| `resolution_notes` | reviewer notes |
| `created_at` | creation timestamp or date |
| `updated_at` | last update timestamp or date |
Primary key:
- (`review_id`)
** current giant mapping
Current scraper outputs map to the new layout as follows:
- `giant_output/raw/history.json` -> `data/giant/raw/history.json`
- `giant_output/raw/<order_id>.json` -> `data/giant/raw/orders/<order_id>.json`
- `giant_output/orders.csv` -> `data/giant/orders.csv`
- `giant_output/items.csv` -> `data/giant/items_raw.csv`
Current Giant raw order payloads already expose fields needed for future
enrichment, including `image`, `itemName`, `primUpcCd`, `lbEachCd`,
`unitPrice`, `groceryAmount`, and `totalPickedWeight`.

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,4 @@
* [X] t1.1: harden giant receipt fetch cli (2-4 commits)
* [ ] t1.1: harden giant receipt fetch cli (2-4 commits)
** acceptance criteria
- giant scraper runs from cli with prompts or env-backed defaults for `user_id` and `loyalty`
- script reuses current browser session via firefox cookies + `curl_cffi`
@@ -12,11 +12,11 @@
- raw json archive remains source of truth
** evidence
- commit: `d57b9cf` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scraper.py --help`; verified `.env` loading via `scraper.load_config()`
- date: 2026-03-14
- commit:
- tests:
- date:
* [X] t1.2: define grocery data model and file layout (1-2 commits)
* [ ] t1.2: define grocery data model and file layout (1-2 commits)
** acceptance criteria
- decide and document the files/directories for:
- retailer raw exports
@@ -28,15 +28,15 @@
- explicitly separate retailer-specific parsing from cross-retailer canonicalization
** notes
- this is the guardrail task so we don't make giant-specific hacks the system of record
- this is the guardrail task so we dont make giant-specific hacks the system of record
- keep schema minimal but extensible
** evidence
- commit: `42dbae1` on branch `cx`
- tests: reviewed `giant_output/raw/history.json`, one sample raw order json, `giant_output/orders.csv`, `giant_output/items.csv`; documented schemas in `pm/data-model.org`
- date: 2026-03-15
- commit:
- tests:
- date:
* [X] t1.3: build giant parser/enricher from raw json (2-4 commits)
* [ ] t1.3: build giant parser/enricher from raw json (2-4 commits)
** acceptance criteria
- parser reads giant raw order json files
- outputs `items_enriched.csv`
@@ -54,11 +54,11 @@
- parser should preserve ambiguity rather than hallucinating precision
** evidence
- commit: `14f2cc2` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python enrich_giant.py`; verified `giant_output/items_enriched.csv` on real raw data
- date: 2026-03-16
- commit:
- tests:
- date:
* [X] t1.4: generate observed-product layer from enriched items (2-3 commits)
* [ ] t1.4: generate observed-product layer from enriched items (2-3 commits)
** acceptance criteria
- distinct observed products are generated from enriched giant items
@@ -76,11 +76,11 @@
- likely key is some combo of retailer + upc + normalized name
** evidence
- commit: `dc39214` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_observed_products.py`; verified `giant_output/products_observed.csv`
- date: 2026-03-16
- commit:
- tests:
- date:
* [X] t1.5: build review queue for unresolved or low-confidence products (1-3 commits)
* [ ] t1.5: build review queue for unresolved or low-confidence products (1-3 commits)
** acceptance criteria
- produce a review file containing observed products needing manual review
@@ -98,11 +98,11 @@
- optimize for “approve once, remember forever”
** evidence
- commit: `9b13ec3` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_review_queue.py`; verified `giant_output/review_queue.csv`
- date: 2026-03-16
- commit:
- tests:
- date:
* [X] t1.6: create canonical product layer and observed→canonical links (2-4 commits)
* [ ] t1.6: create canonical product layer and observed→canonical links (2-4 commits)
** acceptance criteria
- define and create `products_canonical.csv`
@@ -120,11 +120,11 @@
- do not require llm assistance for v1
** evidence
- commit: `347cd44` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_canonical_layer.py`; verified seeded `giant_output/products_canonical.csv` and `giant_output/product_links.csv`
- date: 2026-03-16
- commit:
- tests:
- date:
* [X] t1.7: implement auto-link rules for easy matches (2-3 commits)
* [ ] t1.7: implement auto-link rules for easy matches (2-3 commits)
** acceptance criteria
- auto-link can match observed products to canonical products using deterministic rules
@@ -139,191 +139,43 @@
- false positives are worse than unresolved items
** evidence
- commit: `385a31c` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_canonical_layer.py`; verified auto-linked `giant_output/products_canonical.csv` and `giant_output/product_links.csv`
- date: 2026-03-16
- commit:
- tests:
- date:
* [X] t1.8: support costco raw ingest path (2-5 commits)
* [ ] t1.8: support costco raw ingest path (2-5 commits)
** acceptance criteria
- add a costco-specific raw ingest/export path
- fetch costco receipt summary and receipt detail payloads from graphql endpoint
- persist raw json under `costco_output/raw/orders.csv` and `./items.csv`, same format as giant
- costco-native identifiers such as `transactionBarcode` as order id and `itemNumber` as retailer item id
- preserve discount/coupon rows rather than dropping
** notes
- focus on raw costco acquisistion and flattening
- do not force costco identifiers into `upc`
- bearer/auth values should come from local env, not source
** evidence
- commit: `da00288` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified `costco_output/raw/*.json`, `costco_output/orders.csv`, and `costco_output/items.csv` from the local sample payload
- date: 2026-03-16
* [X] t1.8.1: support costco parser/enricher path (2-4 commits)
** acceptance criteria
- add a costco-specific enrich step producing `costco_output/items_enriched.csv`
- output rows into the same shared enriched schema family as Giant
- support costco-specific parsing for:
- `itemDescription01` + `itemDescription02`
- `itemNumber` as `retailer_item_id`
- discount lines / negative rows
- common size patterns such as `25#`, `48 OZ`, `2/24 OZ`, `6-PACK`
- preserve obvious unknowns as blank rather than guessed values
** notes
- this is the real schema compatibility proof, not raw ingest alone
- expect weaker identifiers than Giant
** evidence
- commit: `da00288` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python enrich_costco.py`; verified `costco_output/items_enriched.csv`
- date: 2026-03-16
* [X] t1.8.2: validate cross-retailer observed/canonical flow (1-3 commits)
** acceptance criteria
- feed Giant and Costco enriched rows through the same observed/canonical pipeline
- output costco line items into the same shared raw/enriched schema family
- confirm at least one product class can exist as:
- Giant observed product
- Costco observed product
- giant observed product
- costco observed product
- one shared canonical product
- document the exact example used for proof
** notes
- keep this to one or two well-behaved product classes first
- apples, eggs, bananas, or flour are better than weird prepared foods
** evidence
- commit: `da00288` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python validate_cross_retailer_flow.py`; proof example: Giant `FRESH BANANA` and Costco `BANANAS 3 LB / 1.36 KG` share one canonical in `combined_output/proof_examples.csv`
- date: 2026-03-16
* [X] t1.8.3: extend shared schema for retailer-native ids and adjustment lines (1-2 commits)
** acceptance criteria
- add shared fields needed for non-upc retailers, including:
- `retailer_item_id`
- `is_discount_line`
- `is_coupon_line` or equivalent if needed
- keep `upc` nullable across the pipeline
- update downstream builders/tests to accept retailers with blank `upc`
** notes
- this prevents costco from becoming a schema hack
- do this once instead of sprinkling exceptions everywhere
** evidence
- commit: `9497565` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; verified shared enriched fields in `giant_output/items_enriched.csv` and `costco_output/items_enriched.csv`
- date: 2026-03-16
* [X] t1.8.4: verify and correct costco receipt enumeration (12 commits)
** acceptance criteria
- confirm graphql summary query returns all expected receipts
- compare `inWarehouse` count vs number of `receipts` returned
- widen or parameterize date window if necessary; website shows receipts in 3-month windows
- persist request metadata (`startDate`, `endDate`, `documentType`, `documentSubType`)
- emit warning when receipt counts mismatch
** notes
- goal is to confirm we are enumerating all receipts before parsing
- do not expand schema or parser logic in this task
- keep changes limited to summary query handling and diagnostics
** evidence
- commit: `ac82fa6` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; reviewed the sample Costco summary request in `pm/scrape-giant.org` against `costco_output/raw/summary.json` and added 3-month window chunking plus mismatch diagnostics
- date: 2026-03-16
* [X] t1.8.5: refactor costco scraper auth and UX with giant scraper
** acceptance criteria
- remove manual auth env vars
- load costco cookies from firefox session
- require only logged-in browser
- replace start/end date flags with --months-back
- maintain same raw output structure
- ensure summary_lookup keys are collision-safe by using a composite key (transactionBarcode + transactionDateTime) instead of transactionBarcode alone
** notes
- align Costco acquisition ergonomics with the Giant scraper
- keep downstream Costco parsing and shared schemas unchanged
** evidence
- commit: `c0054dc` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified Costco summary/detail flattening now uses composite receipt keys in unit tests
- date: 2026-03-16
* [X] t1.8.6: add browser session helper (2-4 commits)
** acceptance criteria
- create a separate Python module/script that extracts firefox browser session data needed for giant and costco scrapers.
- support Firefox and Costco first, including:
- loading cookies via existing browser-cookie approach
- reading browser storage needed for dynamic auth headers (e.g. Costco bearer token)
- copying locked browser sqlite/db files to a temp location before reading when necessary
- expose a small interface usable by scrapers, e.g. cookie jar + storage/header values
- keep retailer-specific parsing of extracted session data outside the low-level browser access layer
- structure the helper so Chromium-family browser support can be added later without changing scraper call sites
** notes
- goal is to replace manual `.env` copying of volatile browser-derived auth data
- session bootstrap only, not full browser automation
- prefer one shared helper over retailer-specific ad hoc storage reads
- Firefox only; Chromium support later
** evidence
- commit: `7789c2e` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_giant.py --help`; `./venv/bin/python scrape_costco.py --help`; verified Firefox storage token extraction and locked-db copy behavior in unit tests
- date: 2026-03-16
* [ ] t1.8.7: simplify costco session bootstrap and remove over-abstraction (2-4 commits)
** acceptance criteria
- make `scrape_costco.py` readable end-to-end without tracing through multiple partial bootstrap layers
- keep `browser_session.py` limited to low-level browser data access only:
- firefox profile discovery
- cookie loading
- storage reads
- sqlite copy/read helpers
- remove or sharply reduce `retailer_sessions.py` so retailer-specific header extraction lives with the retailer scraper or in a very small retailer-specific helper
- make session bootstrap flow explicit and linear:
- load browser context
- extract costco auth values
- build request headers
- build requests session
- eliminate inconsistent/obsolete function signatures and dead call paths (e.g. mixed `build_session(...)` calling conventions, stale fallback branches, mismatched `build_headers(...)` args)
- add one focused bootstrap debug print showing whether cookies, authorization, client id, and client identifier were found
- preserve current working behavior where available; this is a refactor/clarification task, not a feature expansion task
** notes
- goal is to restore concern separation and debuggability
- prefer obvious retailer-specific code over “generic” helpers that guess and obscure control flow
- browser access can stay shared; retailer auth mapping should be explicit
- no new heuristics in this task
- this is the proof that the architecture generalizes
- dont chase perfection before the second retailer lands
** evidence
- commit:
- tests:
- date:
* [ ] t1.9: compute normalized comparison metrics (2-4 commits)
* [ ] t1.9: compute normalized comparison metrics (2-3 commits)
** acceptance criteria
- derive normalized comparison fields where possible on enriched or observed product rows:
- `price_per_lb`
- `price_per_oz`
- `price_per_each`
- `price_per_count`
- preserve the source basis used to derive each metric, e.g.:
- parsed size/unit
- receipt weight
- explicit count/pack
- emit nulls when basis is unknown, conflicting, or ambiguous
- document at least one Giant vs Costco comparison example using the normalized metrics
- derive normalized comparison fields where possible:
- price per lb
- price per oz
- price per each
- price per count
- metrics are attached at canonical or linked-observed level as appropriate
- emit obvious nulls when basis is unknown rather than inventing values
** notes
- compute metrics as close to the raw observation as possible
- canonical layer can aggregate later, but should not invent missing unit economics
- unit discipline matters more than coverage
- this is where “gala apples 5 lb bag vs other gala apples” becomes possible
- units discipline matters a lot here
** evidence
- commit:

Binary file not shown.

254
scrape-click.py Normal file
View File

@@ -0,0 +1,254 @@
import json
import time
from pathlib import Path
import browser_cookie3
import click
import pandas as pd
from curl_cffi import requests
from dotenv import load_dotenv
import os
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
def load_config():
load_dotenv()
return {
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
}
def build_session():
s = requests.Session()
s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
s.headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
})
return s
def safe_get(session, url, **kwargs):
last_response = None
for attempt in range(3):
try:
r = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = r
if r.status_code == 200:
return r
click.echo(f"retry {attempt + 1}/3 status={r.status_code}")
except Exception as e:
click.echo(f"retry {attempt + 1}/3 error={e}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError(f"failed to fetch {url}")
def get_history(session, user_id, loyalty):
url = f"{BASE}/api/v6.0/user/{user_id}/order/history"
r = safe_get(
session,
url,
params={
"filter": "instore",
"loyaltyNumber": loyalty,
},
)
return r.json()
def get_order_detail(session, user_id, order_id):
url = f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}"
r = safe_get(
session,
url,
params={"isInStore": "true"},
)
return r.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {
r["orderId"]: r
for r in history.get("records", [])
}
for d in details:
hist = history_lookup.get(d["orderId"], {})
pup = d.get("pup", {})
orders.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"delivery_date": d.get("deliveryDate"),
"service_type": hist.get("serviceType"),
"order_total": d.get("orderTotal"),
"payment_method": d.get("paymentMethod"),
"total_item_count": d.get("totalItemCount"),
"total_savings": d.get("totalSavings"),
"your_savings_total": d.get("yourSavingsTotal"),
"coupons_discounts_total": d.get("couponsDiscountsTotal"),
"store_name": pup.get("storeName"),
"store_number": pup.get("aholdStoreNumber"),
"store_address1": pup.get("storeAddress1"),
"store_city": pup.get("storeCity"),
"store_state": pup.get("storeState"),
"store_zipcode": pup.get("storeZipcode"),
"refund_order": d.get("refundOrder"),
"ebt_order": d.get("ebtOrder"),
})
for i, item in enumerate(d.get("items", []), start=1):
items.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"line_no": i,
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
})
return pd.DataFrame(orders), pd.DataFrame(items)
def read_existing_order_ids(orders_csv: Path) -> set[str]:
if not orders_csv.exists():
return set()
try:
df = pd.read_csv(orders_csv, dtype={"order_id": str})
if "order_id" not in df.columns:
return set()
return set(df["order_id"].dropna().astype(str))
except Exception:
return set()
def append_dedup(existing_path: Path, new_df: pd.DataFrame, subset: list[str]) -> pd.DataFrame:
if existing_path.exists():
old_df = pd.read_csv(existing_path, dtype=str)
combined = pd.concat([old_df, new_df.astype(str)], ignore_index=True)
else:
combined = new_df.astype(str).copy()
combined = combined.drop_duplicates(subset=subset, keep="last")
combined.to_csv(existing_path, index=False)
return combined
@click.command()
@click.option("--user-id", default=None, help="giant user id")
@click.option("--loyalty", default=None, help="giant loyalty number")
@click.option("--outdir", default="giant_output", show_default=True, help="output directory")
@click.option("--sleep-seconds", default=1.5, show_default=True, type=float, help="delay between detail requests")
def main(user_id, loyalty, outdir, sleep_seconds):
cfg = load_config()
user_id = user_id or cfg["user_id"] or click.prompt("giant user id", type=str)
loyalty = loyalty or cfg["loyalty"] or click.prompt("giant loyalty number", type=str)
outdir = Path(outdir)
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
orders_csv = outdir / "orders.csv"
items_csv = outdir / "items.csv"
click.echo("using cookies from your current firefox profile.")
click.echo(f"open giant here, make sure you're logged in, then return: {ACCOUNT_PAGE}")
click.pause(info="press any key once giant is open and logged in")
session = build_session()
click.echo("fetching order history...")
history = get_history(session, user_id, loyalty)
(rawdir / "history.json").write_text(
json.dumps(history, indent=2),
encoding="utf-8",
)
records = history.get("records", [])
click.echo(f"history returned {len(records)} visits")
click.echo("tip: giant appears to expose only the most recent 50 visits, so run this periodically if you want full continuity.")
history_order_ids = [str(r["orderId"]) for r in records]
existing_order_ids = read_existing_order_ids(orders_csv)
new_order_ids = [oid for oid in history_order_ids if oid not in existing_order_ids]
click.echo(f"existing orders in csv: {len(existing_order_ids)}")
click.echo(f"new orders to fetch: {len(new_order_ids)}")
if not new_order_ids:
click.echo("no new orders found. done.")
return
details = []
for order_id in new_order_ids:
click.echo(f"fetching {order_id}")
d = get_order_detail(session, user_id, order_id)
details.append(d)
(rawdir / f"{order_id}.json").write_text(
json.dumps(d, indent=2),
encoding="utf-8",
)
time.sleep(sleep_seconds)
click.echo("flattening new data...")
orders_df, items_df = flatten_orders(history, details)
orders_all = append_dedup(
orders_csv,
orders_df,
subset=["order_id"],
)
items_all = append_dedup(
items_csv,
items_df,
subset=["order_id", "line_no", "item_name", "upc", "line_total"],
)
click.echo("done")
click.echo(f"orders csv: {orders_csv}")
click.echo(f"items csv: {items_csv}")
click.echo(f"total orders stored: {len(orders_all)}")
click.echo(f"total item rows stored: {len(items_all)}")
if __name__ == "__main__":
main()

View File

@@ -1,710 +0,0 @@
import os
import csv
import json
import time
import re
from pathlib import Path
from calendar import monthrange
from datetime import datetime, timedelta
from dotenv import load_dotenv
import click
from curl_cffi import requests
from browser_session import (
find_firefox_profile_dir,
load_firefox_cookies,
read_firefox_local_storage,
read_firefox_webapps_store,
)
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco"
SUMMARY_QUERY = """
query receiptsWithCounts($startDate: String!, $endDate: String!, $documentType: String!, $documentSubType: String!) {
receiptsWithCounts(startDate: $startDate, endDate: $endDate, documentType: $documentType, documentSubType: $documentSubType) {
inWarehouse
gasStation
carWash
gasAndCarWash
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionBarcode
warehouseName
transactionType
total
totalItemCount
itemArray {
itemNumber
}
tenderArray {
tenderTypeCode
tenderDescription
amountTender
}
couponArray {
upcnumberCoupon
}
}
}
}
""".strip()
DETAIL_QUERY = """
query receiptsWithCounts($barcode: String!, $documentType: String!) {
receiptsWithCounts(barcode: $barcode, documentType: $documentType) {
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionDate
companyNumber
warehouseNumber
operatorNumber
warehouseShortName
registerNumber
transactionNumber
transactionType
transactionBarcode
total
warehouseAddress1
warehouseAddress2
warehouseCity
warehouseState
warehouseCountry
warehousePostalCode
totalItemCount
subTotal
taxes
total
invoiceNumber
sequenceNumber
itemArray {
itemNumber
itemDescription01
frenchItemDescription1
itemDescription02
frenchItemDescription2
itemIdentifier
itemDepartmentNumber
unit
amount
taxFlag
merchantID
entryMethod
transDepartmentNumber
fuelUnitQuantity
fuelGradeCode
itemUnitPriceAmount
fuelUomCode
fuelUomDescription
fuelUomDescriptionFr
fuelGradeDescription
fuelGradeDescriptionFr
}
tenderArray {
tenderTypeCode
tenderSubTypeCode
tenderDescription
amountTender
displayAccountNumber
sequenceNumber
approvalNumber
responseCode
tenderTypeName
transactionID
merchantID
entryMethod
tenderAcctTxnNumber
tenderAuthorizationCode
tenderTypeNameFr
tenderEntryMethodDescription
walletType
walletId
storedValueBucket
}
subTaxes {
tax1
tax2
tax3
tax4
aTaxPercent
aTaxLegend
aTaxAmount
aTaxPrintCode
aTaxPrintCodeFR
aTaxIdentifierCode
bTaxPercent
bTaxLegend
bTaxAmount
bTaxPrintCode
bTaxPrintCodeFR
bTaxIdentifierCode
cTaxPercent
cTaxLegend
cTaxAmount
cTaxIdentifierCode
dTaxPercent
dTaxLegend
dTaxAmount
dTaxPrintCode
dTaxPrintCodeFR
dTaxIdentifierCode
uTaxLegend
uTaxAmount
uTaxableAmount
}
instantSavings
membershipNumber
}
}
}
""".strip()
ORDER_FIELDS = [
"retailer",
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
"raw_history_path",
"raw_order_path",
]
ITEM_FIELDS = [
"retailer",
"order_id",
"line_no",
"order_date",
"retailer_item_id",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
"image_url",
"raw_order_path",
"is_discount_line",
"is_coupon_line",
]
COSTCO_STORAGE_ORIGIN = "costco.com"
COSTCO_ID_TOKEN_STORAGE_KEY = "idToken"
COSTCO_CLIENT_ID_STORAGE_KEY = "clientID"
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_X_WCS_CLIENTID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(auth_headers):
headers = {
"accept": "*/*",
"content-type": "application/json-patch+json",
"costco.service": "restOrders",
"costco.env": "ecom",
"origin": "https://www.costco.com",
"referer": "https://www.costco.com/",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
headers.update(auth_headers)
return headers
def load_costco_browser_headers(profile_dir, authorization, client_id, client_identifier):
local_storage = read_firefox_local_storage(profile_dir, COSTCO_STORAGE_ORIGIN)
webapps_store = read_firefox_webapps_store(profile_dir, COSTCO_STORAGE_ORIGIN)
auth_header = authorization.strip() if authorization else ""
if client_id:
client_id = client_id.strip()
if client_identifier:
client_identifier = client_identifier.strip()
if not auth_header:
id_token = (
local_storage.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
)
if id_token:
auth_header = f"Bearer {id_token}"
client_id = client_id or (
local_storage.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
)
if not auth_header:
raise click.ClickException(
"could not find Costco auth token; set COSTCO_X_AUTHORIZATION or load Firefox idToken"
)
if not client_id or not client_identifier:
raise click.ClickException(
"missing Costco client ids; set COSTCO_X_WCS_CLIENTID and COSTCO_CLIENT_IDENTIFIER"
)
return {
"costco-x-authorization": auth_header,
"costco-x-wcs-clientId": client_id,
"client-identifier": client_identifier,
}
def build_session(profile_dir, auth_headers):
session = requests.Session()
session.cookies.update(load_firefox_cookies(".costco.com", profile_dir))
session.headers.update(build_headers(auth_headers))
session.headers.update(auth_headers)
return session
def graphql_post(session, query, variables):
last_response = None
for attempt in range(3):
try:
response = session.post(
BASE_URL,
json={"query": query, "variables": variables},
impersonate="firefox",
timeout=30,
)
last_response = response
if response.status_code == 200:
return response.json()
click.echo(f"retry {attempt + 1}/3 status={response.status_code} body={response.text[:500]}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError("failed to fetch Costco GraphQL payload")
def safe_filename(value):
return re.sub(r'[<>:"/\\|?*]+', "-", str(value))
def summary_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def detail_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def summary_counts(payload):
counts = payload.get("data", {}).get("receiptsWithCounts", {})
return {
"inWarehouse": counts.get("inWarehouse", 0) or 0,
"gasStation": counts.get("gasStation", 0) or 0,
"carWash": counts.get("carWash", 0) or 0,
"gasAndCarWash": counts.get("gasAndCarWash", 0) or 0,
}
def parse_cli_date(value):
return datetime.strptime(value, "%m/%d/%Y").date()
def format_cli_date(value):
return f"{value.month}/{value.day:02d}/{value.year}"
def subtract_months(value, months):
year = value.year
month = value.month - months
while month <= 0:
month += 12
year -= 1
day = min(value.day, monthrange(year, month)[1])
return value.replace(year=year, month=month, day=day)
def resolve_date_range(months_back, today=None):
if months_back < 1:
raise click.ClickException("months-back must be at least 1")
end = today or datetime.now().date()
start = subtract_months(end, months_back)
return format_cli_date(start), format_cli_date(end)
def build_date_windows(start_date, end_date, window_days):
start = parse_cli_date(start_date)
end = parse_cli_date(end_date)
if end < start:
raise click.ClickException("end-date must be on or after start-date")
if window_days < 1:
raise click.ClickException("window-days must be at least 1")
windows = []
current = start
while current <= end:
window_end = min(current + timedelta(days=window_days - 1), end)
windows.append(
{
"startDate": format_cli_date(current),
"endDate": format_cli_date(window_end),
}
)
current = window_end + timedelta(days=1)
return windows
def unique_receipts(receipts):
by_barcode = {}
for receipt in receipts:
key = receipt_key(receipt)
if key:
by_barcode[key] = receipt
return list(by_barcode.values())
def receipt_key(receipt):
barcode = receipt.get("transactionBarcode", "")
transaction_date_time = receipt.get("transactionDateTime", "")
if not barcode:
return ""
return f"{barcode}::{transaction_date_time}"
def fetch_summary_windows(
session,
start_date,
end_date,
document_type,
document_sub_type,
window_days,
):
requests_metadata = []
combined_receipts = []
for window in build_date_windows(start_date, end_date, window_days):
variables = {
"startDate": window["startDate"],
"endDate": window["endDate"],
"text": "custom",
"documentType": document_type,
"documentSubType": document_sub_type,
}
payload = graphql_post(session, SUMMARY_QUERY, variables)
receipts = summary_receipts(payload)
counts = summary_counts(payload)
warehouse_count = sum(
1 for receipt in receipts if receipt.get("receiptType") == "In-Warehouse"
)
mismatch = counts["inWarehouse"] != warehouse_count
requests_metadata.append(
{
**variables,
"returnedReceipts": len(receipts),
"returnedInWarehouseReceipts": warehouse_count,
"inWarehouse": counts["inWarehouse"],
"gasStation": counts["gasStation"],
"carWash": counts["carWash"],
"gasAndCarWash": counts["gasAndCarWash"],
"countMismatch": mismatch,
}
)
if mismatch:
click.echo(
(
"warning: summary count mismatch for "
f"{window['startDate']} to {window['endDate']}: "
f"inWarehouse={counts['inWarehouse']} "
f"returnedInWarehouseReceipts={warehouse_count}"
),
err=True,
)
combined_receipts.extend(receipts)
unique = unique_receipts(combined_receipts)
aggregate_payload = {
"data": {
"receiptsWithCounts": {
"inWarehouse": sum(row["inWarehouse"] for row in requests_metadata),
"gasStation": sum(row["gasStation"] for row in requests_metadata),
"carWash": sum(row["carWash"] for row in requests_metadata),
"gasAndCarWash": sum(row["gasAndCarWash"] for row in requests_metadata),
"receipts": unique,
}
}
}
return aggregate_payload, requests_metadata
def flatten_costco_data(summary_payload, detail_payloads, raw_dir):
summary_lookup = {
receipt_key(receipt): receipt
for receipt in summary_receipts(summary_payload)
if receipt_key(receipt)
}
orders = []
items = []
for detail_payload in detail_payloads:
for receipt in detail_receipts(detail_payload):
order_id = receipt["transactionBarcode"]
receipt_id = receipt_key(receipt)
summary_row = summary_lookup.get(receipt_id, {})
coupon_numbers = {
row.get("upcnumberCoupon", "")
for row in summary_row.get("couponArray", []) or []
if row.get("upcnumberCoupon")
}
raw_order_path = raw_dir / f"{safe_filename(receipt_id or order_id)}.json"
orders.append(
{
"retailer": RETAILER,
"order_id": order_id,
"order_date": receipt.get("transactionDate", ""),
"delivery_date": receipt.get("transactionDate", ""),
"service_type": receipt.get("receiptType", ""),
"order_total": stringify(receipt.get("total")),
"payment_method": compact_join(
summary_row.get("tenderArray", []) or [], "tenderDescription"
),
"total_item_count": stringify(receipt.get("totalItemCount")),
"total_savings": stringify(receipt.get("instantSavings")),
"your_savings_total": stringify(receipt.get("instantSavings")),
"coupons_discounts_total": stringify(receipt.get("instantSavings")),
"store_name": receipt.get("warehouseName", ""),
"store_number": stringify(receipt.get("warehouseNumber")),
"store_address1": receipt.get("warehouseAddress1", ""),
"store_city": receipt.get("warehouseCity", ""),
"store_state": receipt.get("warehouseState", ""),
"store_zipcode": receipt.get("warehousePostalCode", ""),
"refund_order": "false",
"ebt_order": "false",
"raw_history_path": (raw_dir / "summary.json").as_posix(),
"raw_order_path": raw_order_path.as_posix(),
}
)
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
item_number = stringify(item.get("itemNumber"))
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
is_discount = is_discount_line(item)
is_coupon = is_discount and (
item_number in coupon_numbers
or description.startswith("/")
)
items.append(
{
"retailer": RETAILER,
"order_id": order_id,
"line_no": str(line_no),
"order_date": receipt.get("transactionDate", ""),
"retailer_item_id": item_number,
"pod_id": "",
"item_name": description,
"upc": "",
"category_id": stringify(item.get("itemDepartmentNumber")),
"category": stringify(item.get("transDepartmentNumber")),
"qty": stringify(item.get("unit")),
"unit": stringify(item.get("itemIdentifier")),
"unit_price": stringify(item.get("itemUnitPriceAmount")),
"line_total": stringify(item.get("amount")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": stringify(item.get("amount") if is_coupon else ""),
"coupon_price": "",
"image_url": "",
"raw_order_path": raw_order_path.as_posix(),
"is_discount_line": "true" if is_discount else "false",
"is_coupon_line": "true" if is_coupon else "false",
}
)
return orders, items
def join_descriptions(*parts):
return " ".join(str(part).strip() for part in parts if part).strip()
def compact_join(rows, field):
values = [str(row.get(field, "")).strip() for row in rows if row.get(field)]
return " | ".join(values)
def is_discount_line(item):
amount = item.get("amount")
unit = item.get("unit")
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
try:
amount_val = float(amount)
except (TypeError, ValueError):
amount_val = 0.0
try:
unit_val = float(unit)
except (TypeError, ValueError):
unit_val = 0.0
return amount_val < 0 or unit_val < 0 or description.startswith("/")
def stringify(value):
if value is None:
return ""
return str(value)
def write_json(path, payload):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_csv(path, rows, fieldnames):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--outdir",
default="costco_output",
show_default=True,
help="Output directory for Costco raw and flattened files.",
)
@click.option(
"--document-type",
default="all",
show_default=True,
help="Summary document type.",
)
@click.option(
"--document-sub-type",
default="all",
show_default=True,
help="Summary document sub type.",
)
@click.option(
"--window-days",
default=92,
show_default=True,
type=int,
help="Maximum number of days to request per summary window.",
)
@click.option(
"--months-back",
default=36,
show_default=True,
type=int,
help="How many months of receipts to enumerate back from today.",
)
@click.option(
"--firefox-profile-dir",
default=None,
help="Firefox profile directory to use for cookies and session storage.",
)
def main(
outdir,
document_type,
document_sub_type,
window_days,
months_back,
firefox_profile_dir,
):
outdir = Path(outdir)
raw_dir = outdir / "raw"
config = load_config()
profile_dir = Path(firefox_profile_dir) if firefox_profile_dir else None
if profile_dir is None:
try:
profile_dir = find_firefox_profile_dir()
except Exception:
profile_dir = click.prompt(
"Firefox profile dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
)
auth_headers = load_costco_browser_headers(
profile_dir,
authorization=config["authorization"],
client_id=config["client_id"],
client_identifier=config["client_identifier"],
)
session = build_session(profile_dir, auth_headers)
start_date, end_date = resolve_date_range(months_back)
summary_payload, request_metadata = fetch_summary_windows(
session,
start_date,
end_date,
document_type,
document_sub_type,
window_days,
)
write_json(raw_dir / "summary.json", summary_payload)
write_json(raw_dir / "summary_requests.json", request_metadata)
receipts = summary_receipts(summary_payload)
detail_payloads = []
for receipt in receipts:
barcode = receipt["transactionBarcode"]
receipt_id = receipt_key(receipt) or barcode
click.echo(f"fetching {barcode}")
detail_payload = graphql_post(
session,
DETAIL_QUERY,
{"barcode": barcode, "documentType": "warehouse"},
)
detail_payloads.append(detail_payload)
write_json(raw_dir / f"{safe_filename(receipt_id)}.json", detail_payload)
orders, items = flatten_costco_data(summary_payload, detail_payloads, raw_dir)
write_csv(outdir / "orders.csv", orders, ORDER_FIELDS)
write_csv(outdir / "items.csv", items, ITEM_FIELDS)
click.echo(f"wrote {len(orders)} orders and {len(items)} item rows to {outdir}")
if __name__ == "__main__":
main()

View File

@@ -1,333 +0,0 @@
import csv
import json
import os
import time
from pathlib import Path
import click
from dotenv import load_dotenv
from curl_cffi import requests
from browser_session import find_firefox_profile_dir, load_firefox_cookies
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
ORDER_FIELDS = [
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
]
ITEM_FIELDS = [
"order_id",
"order_date",
"line_no",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
]
def load_config():
if load_dotenv is not None:
load_dotenv()
return {
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
}
def build_session():
profile_dir = find_firefox_profile_dir()
session = requests.Session()
session.cookies.update(load_firefox_cookies("giantfood.com", profile_dir))
session.headers.update(
{
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
}
)
return session
def safe_get(session, url, **kwargs):
last_response = None
for attempt in range(3):
try:
response = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = response
if response.status_code == 200:
return response
click.echo(f"retry {attempt + 1}/3 status={response.status_code}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError(f"failed to fetch {url}")
def get_history(session, user_id, loyalty):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history",
params={"filter": "instore", "loyaltyNumber": loyalty},
)
return response.json()
def get_order_detail(session, user_id, order_id):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}",
params={"isInStore": "true"},
)
return response.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {record["orderId"]: record for record in history.get("records", [])}
for detail in details:
order_id = str(detail["orderId"])
history_row = history_lookup.get(detail["orderId"], {})
pickup = detail.get("pup", {})
orders.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"delivery_date": detail.get("deliveryDate"),
"service_type": history_row.get("serviceType"),
"order_total": detail.get("orderTotal"),
"payment_method": detail.get("paymentMethod"),
"total_item_count": detail.get("totalItemCount"),
"total_savings": detail.get("totalSavings"),
"your_savings_total": detail.get("yourSavingsTotal"),
"coupons_discounts_total": detail.get("couponsDiscountsTotal"),
"store_name": pickup.get("storeName"),
"store_number": pickup.get("aholdStoreNumber"),
"store_address1": pickup.get("storeAddress1"),
"store_city": pickup.get("storeCity"),
"store_state": pickup.get("storeState"),
"store_zipcode": pickup.get("storeZipcode"),
"refund_order": detail.get("refundOrder"),
"ebt_order": detail.get("ebtOrder"),
}
)
for line_no, item in enumerate(detail.get("items", []), start=1):
items.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"line_no": str(line_no),
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
}
)
return orders, items
def normalize_row(row, fieldnames):
return {field: stringify(row.get(field)) for field in fieldnames}
def stringify(value):
if value is None:
return ""
return str(value)
def read_csv_rows(path):
if not path.exists():
return [], []
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
fieldnames = reader.fieldnames or []
return fieldnames, list(reader)
def read_existing_order_ids(path):
_, rows = read_csv_rows(path)
return {row["order_id"] for row in rows if row.get("order_id")}
def merge_rows(existing_rows, new_rows, subset):
merged = []
row_index = {}
for row in existing_rows + new_rows:
key = tuple(stringify(row.get(field)) for field in subset)
normalized = dict(row)
if key in row_index:
merged[row_index[key]] = normalized
else:
row_index[key] = len(merged)
merged.append(normalized)
return merged
def append_dedup(path, new_rows, subset, fieldnames):
existing_fieldnames, existing_rows = read_csv_rows(path)
all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames))
merged = merge_rows(
[normalize_row(row, all_fieldnames) for row in existing_rows],
[normalize_row(row, all_fieldnames) for row in new_rows],
subset=subset,
)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=all_fieldnames)
writer.writeheader()
writer.writerows(merged)
return merged
def write_json(path, payload):
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
@click.command()
@click.option("--user-id", default=None, help="Giant user id.")
@click.option("--loyalty", default=None, help="Giant loyalty number.")
@click.option(
"--outdir",
default="giant_output",
show_default=True,
help="Directory for raw json and csv outputs.",
)
@click.option(
"--sleep-seconds",
default=1.5,
show_default=True,
type=float,
help="Delay between order detail requests.",
)
def main(user_id, loyalty, outdir, sleep_seconds):
config = load_config()
user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str)
loyalty = loyalty or config["loyalty"] or click.prompt(
"Giant loyalty number", type=str
)
outdir = Path(outdir)
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
orders_csv = outdir / "orders.csv"
items_csv = outdir / "items.csv"
existing_order_ids = read_existing_order_ids(orders_csv)
session = build_session()
history = get_history(session, user_id, loyalty)
write_json(rawdir / "history.json", history)
records = history.get("records", [])
click.echo(f"history returned {len(records)} visits; Giant exposes only the most recent 50")
unseen_records = [
record
for record in records
if stringify(record.get("orderId")) not in existing_order_ids
]
click.echo(
f"found {len(unseen_records)} unseen visits "
f"({len(existing_order_ids)} already stored)"
)
details = []
for index, record in enumerate(unseen_records, start=1):
order_id = stringify(record.get("orderId"))
click.echo(f"[{index}/{len(unseen_records)}] fetching {order_id}")
detail = get_order_detail(session, user_id, order_id)
write_json(rawdir / f"{order_id}.json", detail)
details.append(detail)
if index < len(unseen_records):
time.sleep(sleep_seconds)
orders, items = flatten_orders(history, details)
merged_orders = append_dedup(
orders_csv,
orders,
subset=["order_id"],
fieldnames=ORDER_FIELDS,
)
merged_items = append_dedup(
items_csv,
items,
subset=["order_id", "line_no"],
fieldnames=ITEM_FIELDS,
)
click.echo(
f"wrote {len(orders)} new orders / {len(items)} new items "
f"({len(merged_orders)} total orders, {len(merged_items)} total items)"
)
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,180 @@
from scrape_giant import * # noqa: F401,F403
import json
import time
from pathlib import Path
import browser_cookie3
import pandas as pd
from curl_cffi import requests
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
USER_ID = "369513017"
LOYALTY = "440155630880"
def build_session():
s = requests.Session()
s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
s.headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
})
return s
def safe_get(session, url, **kwargs):
last_response = None
for attempt in range(3):
try:
r = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = r
if r.status_code == 200:
return r
print(f"retry {attempt + 1}/3 status={r.status_code}")
except Exception as e:
print(f"retry {attempt + 1}/3 error={e}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError(f"failed to fetch {url}")
def get_history(session):
url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history"
r = safe_get(
session,
url,
params={
"filter": "instore",
"loyaltyNumber": LOYALTY,
},
)
return r.json()
def get_order_detail(session, order_id):
url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history/detail/{order_id}"
r = safe_get(
session,
url,
params={"isInStore": "true"},
)
return r.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {
r["orderId"]: r
for r in history.get("records", [])
}
for d in details:
hist = history_lookup.get(d["orderId"], {})
pup = d.get("pup", {})
orders.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"delivery_date": d.get("deliveryDate"),
"service_type": hist.get("serviceType"),
"order_total": d.get("orderTotal"),
"payment_method": d.get("paymentMethod"),
"total_item_count": d.get("totalItemCount"),
"total_savings": d.get("totalSavings"),
"your_savings_total": d.get("yourSavingsTotal"),
"coupons_discounts_total": d.get("couponsDiscountsTotal"),
"store_name": pup.get("storeName"),
"store_number": pup.get("aholdStoreNumber"),
"store_address1": pup.get("storeAddress1"),
"store_city": pup.get("storeCity"),
"store_state": pup.get("storeState"),
"store_zipcode": pup.get("storeZipcode"),
"refund_order": d.get("refundOrder"),
"ebt_order": d.get("ebtOrder"),
})
for i, item in enumerate(d.get("items", []), start=1):
items.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"line_no": i,
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
})
return pd.DataFrame(orders), pd.DataFrame(items)
def main():
outdir = Path("giant_output")
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
session = build_session()
print("fetching order history...")
history = get_history(session)
(rawdir / "history.json").write_text(
json.dumps(history, indent=2),
encoding="utf-8",
)
order_ids = [r["orderId"] for r in history.get("records", [])]
print(f"{len(order_ids)} orders found")
details = []
for order_id in order_ids:
print(f"fetching {order_id}")
d = get_order_detail(session, order_id)
details.append(d)
(rawdir / f"{order_id}.json").write_text(
json.dumps(d, indent=2),
encoding="utf-8",
)
time.sleep(1.5)
print("flattening data...")
orders_df, items_df = flatten_orders(history, details)
orders_df.to_csv(outdir / "orders.csv", index=False)
items_df.to_csv(outdir / "items.csv", index=False)
print("done")
print(f"{len(orders_df)} orders written to {outdir / 'orders.csv'}")
print(f"{len(items_df)} items written to {outdir / 'items.csv'}")
if __name__ == "__main__":

View File

@@ -1,17 +1,28 @@
import unittest
import requests
import browser_cookie3
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
try:
import browser_cookie3 # noqa: F401
import requests # noqa: F401
except ImportError as exc: # pragma: no cover - dependency-gated smoke test
browser_cookie3 = None
_IMPORT_ERROR = exc
else:
_IMPORT_ERROR = None
USER_ID = "369513017"
LOYALTY = "440155630880"
cj = browser_cookie3.firefox(domain_name="giantfood.com")
@unittest.skipIf(browser_cookie3 is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}")
class BrowserCookieSmokeTest(unittest.TestCase):
def test_dependencies_available(self):
self.assertIsNotNone(browser_cookie3)
s = requests.Session()
s.cookies.update(cj)
s.headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
})
r = s.get(
f"{BASE}/api/v6.0/user/{USER_ID}/order/history",
params={"filter": "instore", "loyaltyNumber": LOYALTY},
timeout=30,
)
print(r.status_code)
print(r.text[:500])

View File

@@ -1,17 +1,27 @@
import unittest
import browser_cookie3
from curl_cffi import requests
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
try:
import browser_cookie3 # noqa: F401
from curl_cffi import requests # noqa: F401
except ImportError as exc: # pragma: no cover - dependency-gated smoke test
browser_cookie3 = None
_IMPORT_ERROR = exc
else:
_IMPORT_ERROR = None
USER_ID = "369513017"
LOYALTY = "440155630880"
s = requests.Session()
s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
s.headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
})
@unittest.skipIf(browser_cookie3 is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}")
class CurlCffiSmokeTest(unittest.TestCase):
def test_dependencies_available(self):
self.assertIsNotNone(browser_cookie3)
r = s.get(
f"{BASE}/api/v6.0/user/{USER_ID}/order/history",
params={"filter": "instore", "loyaltyNumber": LOYALTY},
impersonate="firefox",
timeout=30,
)
print(r.status_code)
print(r.text[:500])

View File

@@ -1,155 +0,0 @@
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import browser_session
import scrape_costco
class BrowserSessionTests(unittest.TestCase):
def test_read_firefox_local_storage_reads_copied_sqlite(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir) / "abcd.default-release"
ls_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
ls_dir.mkdir(parents=True)
db_path = ls_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("costco-x-wcs-clientId", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
)
values = browser_session.read_firefox_local_storage(
profile_dir,
origin_filter="costco.com",
)
self.assertEqual(
"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
values["costco-x-wcs-clientId"],
)
def test_load_costco_browser_headers_reads_id_token_and_client_id(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir)
storage_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
storage_dir.mkdir(parents=True)
db_path = storage_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("idToken", "header.payload.signature"),
)
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("clientID", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
)
headers = scrape_costco.load_costco_browser_headers(
profile_dir,
authorization="",
client_id="",
client_identifier="481b1aec-aa3b-454b-b81b-48187e28f205",
)
self.assertEqual("Bearer header.payload.signature", headers["costco-x-authorization"])
self.assertEqual(
"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
headers["costco-x-wcs-clientId"],
)
self.assertEqual(
"481b1aec-aa3b-454b-b81b-48187e28f205",
headers["client-identifier"],
)
def test_load_costco_browser_headers_prefers_env_values(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir)
storage_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
storage_dir.mkdir(parents=True)
db_path = storage_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("idToken", "storage.payload.signature"),
)
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("clientID", "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf"),
)
headers = scrape_costco.load_costco_browser_headers(
profile_dir,
authorization="Bearer env.payload.signature",
client_id="env-client-id",
client_identifier="481b1aec-aa3b-454b-b81b-48187e28f205",
)
self.assertEqual("Bearer env.payload.signature", headers["costco-x-authorization"])
self.assertEqual("env-client-id", headers["costco-x-wcs-clientId"])
def test_scrape_costco_prompts_for_profile_dir_when_autodiscovery_fails(self):
with mock.patch.object(
scrape_costco,
"find_firefox_profile_dir",
side_effect=FileNotFoundError("no default profile"),
), mock.patch.object(
scrape_costco.click,
"prompt",
return_value=Path("/tmp/profile"),
) as mocked_prompt, mock.patch.object(
scrape_costco,
"load_config",
return_value={
"authorization": "",
"client_id": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client_identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"load_costco_browser_headers",
return_value={
"costco-x-authorization": "Bearer header.payload.signature",
"costco-x-wcs-clientId": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client-identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"build_session",
return_value=object(),
), mock.patch.object(
scrape_costco,
"fetch_summary_windows",
return_value=(
{"data": {"receiptsWithCounts": {"receipts": []}}},
[],
),
), mock.patch.object(
scrape_costco,
"write_json",
), mock.patch.object(
scrape_costco,
"write_csv",
):
scrape_costco.main.callback(
outdir="/tmp/costco_output",
document_type="all",
document_sub_type="all",
window_days=92,
months_back=3,
firefox_profile_dir=None,
)
mocked_prompt.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,99 +0,0 @@
import unittest
import build_canonical_layer
class CanonicalLayerTests(unittest.TestCase):
def test_build_canonical_layer_auto_links_exact_upc_and_name_size(self):
observed_rows = [
{
"observed_product_id": "gobs_1",
"representative_upc": "111",
"representative_retailer_item_id": "11",
"representative_name_norm": "GALA APPLE",
"representative_brand": "SB",
"representative_variant": "",
"representative_size_value": "5",
"representative_size_unit": "lb",
"representative_pack_qty": "",
"representative_measure_type": "weight",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"observed_product_id": "gobs_2",
"representative_upc": "111",
"representative_retailer_item_id": "12",
"representative_name_norm": "LARGE WHITE EGGS",
"representative_brand": "SB",
"representative_variant": "",
"representative_size_value": "",
"representative_size_unit": "",
"representative_pack_qty": "18",
"representative_measure_type": "count",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"observed_product_id": "gobs_3",
"representative_upc": "",
"representative_retailer_item_id": "21",
"representative_name_norm": "ROTINI",
"representative_brand": "",
"representative_variant": "",
"representative_size_value": "16",
"representative_size_unit": "oz",
"representative_pack_qty": "",
"representative_measure_type": "weight",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"observed_product_id": "gobs_4",
"representative_upc": "",
"representative_retailer_item_id": "22",
"representative_name_norm": "ROTINI",
"representative_brand": "SB",
"representative_variant": "",
"representative_size_value": "16",
"representative_size_unit": "oz",
"representative_pack_qty": "",
"representative_measure_type": "weight",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"observed_product_id": "gobs_5",
"representative_upc": "",
"representative_retailer_item_id": "99",
"representative_name_norm": "GL BAG CHARGE",
"representative_brand": "",
"representative_variant": "",
"representative_size_value": "",
"representative_size_unit": "",
"representative_pack_qty": "",
"representative_measure_type": "each",
"is_fee": "true",
"is_discount_line": "false",
"is_coupon_line": "false",
},
]
canonicals, links = build_canonical_layer.build_canonical_layer(observed_rows)
self.assertEqual(2, len(canonicals))
self.assertEqual(4, len(links))
methods = {row["observed_product_id"]: row["link_method"] for row in links}
self.assertEqual("exact_upc", methods["gobs_1"])
self.assertEqual("exact_upc", methods["gobs_2"])
self.assertEqual("exact_name_size", methods["gobs_3"])
self.assertEqual("exact_name_size", methods["gobs_4"])
self.assertNotIn("gobs_5", methods)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,460 +0,0 @@
import csv
import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import enrich_costco
import scrape_costco
import validate_cross_retailer_flow
class CostcoPipelineTests(unittest.TestCase):
def test_resolve_date_range_uses_months_back(self):
start_date, end_date = scrape_costco.resolve_date_range(
3, today=scrape_costco.parse_cli_date("3/16/2026")
)
self.assertEqual("12/16/2025", start_date)
self.assertEqual("3/16/2026", end_date)
def test_build_date_windows_splits_long_ranges(self):
windows = scrape_costco.build_date_windows("1/01/2026", "6/30/2026", 92)
self.assertEqual(
[
{"startDate": "1/01/2026", "endDate": "4/02/2026"},
{"startDate": "4/03/2026", "endDate": "6/30/2026"},
],
windows,
)
def test_fetch_summary_windows_records_metadata_and_warns_on_mismatch(self):
payloads = [
{
"data": {
"receiptsWithCounts": {
"inWarehouse": 2,
"gasStation": 0,
"carWash": 0,
"gasAndCarWash": 0,
"receipts": [
{
"transactionBarcode": "abc",
"receiptType": "In-Warehouse",
}
],
}
}
},
{
"data": {
"receiptsWithCounts": {
"inWarehouse": 1,
"gasStation": 0,
"carWash": 0,
"gasAndCarWash": 0,
"receipts": [
{
"transactionBarcode": "def",
"receiptType": "In-Warehouse",
}
],
}
}
},
]
with mock.patch.object(
scrape_costco, "graphql_post", side_effect=payloads
) as mocked_post, mock.patch.object(scrape_costco.click, "echo") as mocked_echo:
summary_payload, metadata = scrape_costco.fetch_summary_windows(
session=object(),
start_date="1/01/2026",
end_date="6/30/2026",
document_type="all",
document_sub_type="all",
window_days=92,
)
self.assertEqual(2, mocked_post.call_count)
self.assertEqual(2, len(metadata))
self.assertTrue(metadata[0]["countMismatch"])
self.assertFalse(metadata[1]["countMismatch"])
self.assertEqual("1/01/2026", metadata[0]["startDate"])
self.assertEqual("4/03/2026", metadata[1]["startDate"])
self.assertEqual(
["abc", "def"],
[
row["transactionBarcode"]
for row in scrape_costco.summary_receipts(summary_payload)
],
)
mocked_echo.assert_called_once()
warning_text = mocked_echo.call_args.args[0]
self.assertIn("warning: summary count mismatch", warning_text)
def test_flatten_costco_data_preserves_discount_rows(self):
summary_payload = {
"data": {
"receiptsWithCounts": {
"receipts": [
{
"transactionBarcode": "abc",
"tenderArray": [{"tenderDescription": "VISA"}],
"couponArray": [{"upcnumberCoupon": "2100003746641"}],
}
]
}
}
}
detail_payloads = [
{
"data": {
"receiptsWithCounts": {
"receipts": [
{
"transactionBarcode": "abc",
"transactionDate": "2026-03-12",
"receiptType": "In-Warehouse",
"total": 10.0,
"totalItemCount": 2,
"instantSavings": 5.0,
"warehouseName": "MT VERNON",
"warehouseNumber": 1115,
"warehouseAddress1": "7940 RICHMOND HWY",
"warehouseCity": "ALEXANDRIA",
"warehouseState": "VA",
"warehousePostalCode": "22306",
"itemArray": [
{
"itemNumber": "4873222",
"itemDescription01": "ALL F&C",
"itemDescription02": "200OZ 160LOADS P104",
"itemDepartmentNumber": 14,
"transDepartmentNumber": 14,
"unit": 1,
"itemIdentifier": "E",
"amount": 19.99,
"itemUnitPriceAmount": 19.99,
},
{
"itemNumber": "374664",
"itemDescription01": "/ 4873222",
"itemDescription02": None,
"itemDepartmentNumber": 14,
"transDepartmentNumber": 14,
"unit": -1,
"itemIdentifier": None,
"amount": -5,
"itemUnitPriceAmount": 0,
},
],
}
]
}
}
}
]
orders, items = scrape_costco.flatten_costco_data(
summary_payload, detail_payloads, Path("costco_output/raw")
)
self.assertEqual(1, len(orders))
self.assertEqual(2, len(items))
self.assertEqual("false", items[0]["is_discount_line"])
self.assertEqual("true", items[1]["is_discount_line"])
self.assertEqual("true", items[1]["is_coupon_line"])
def test_flatten_costco_data_uses_composite_summary_lookup_key(self):
summary_payload = {
"data": {
"receiptsWithCounts": {
"receipts": [
{
"transactionBarcode": "dup",
"transactionDateTime": "2026-03-12T16:16:00",
"tenderArray": [{"tenderDescription": "VISA"}],
"couponArray": [{"upcnumberCoupon": "111"}],
},
{
"transactionBarcode": "dup",
"transactionDateTime": "2026-02-14T16:25:00",
"tenderArray": [{"tenderDescription": "MASTERCARD"}],
"couponArray": [],
},
]
}
}
}
detail_payloads = [
{
"data": {
"receiptsWithCounts": {
"receipts": [
{
"transactionBarcode": "dup",
"transactionDateTime": "2026-03-12T16:16:00",
"transactionDate": "2026-03-12",
"receiptType": "In-Warehouse",
"total": 10.0,
"totalItemCount": 1,
"instantSavings": 5.0,
"warehouseName": "MT VERNON",
"warehouseNumber": 1115,
"warehouseAddress1": "7940 RICHMOND HWY",
"warehouseCity": "ALEXANDRIA",
"warehouseState": "VA",
"warehousePostalCode": "22306",
"itemArray": [
{
"itemNumber": "111",
"itemDescription01": "/ 111",
"itemDescription02": None,
"itemDepartmentNumber": 14,
"transDepartmentNumber": 14,
"unit": -1,
"itemIdentifier": None,
"amount": -5,
"itemUnitPriceAmount": 0,
}
],
}
]
}
}
}
]
orders, items = scrape_costco.flatten_costco_data(
summary_payload, detail_payloads, Path("costco_output/raw")
)
self.assertEqual("VISA", orders[0]["payment_method"])
self.assertEqual("true", items[0]["is_coupon_line"])
self.assertIn("dup-2026-03-12T16-16-00.json", items[0]["raw_order_path"])
def test_costco_enricher_parses_size_pack_and_discount(self):
row = enrich_costco.parse_costco_item(
order_id="abc",
order_date="2026-03-12",
raw_path=Path("costco_output/raw/abc.json"),
line_no=1,
item={
"itemNumber": "60357",
"itemDescription01": "MIXED PEPPER",
"itemDescription02": "6-PACK",
"itemDepartmentNumber": 65,
"transDepartmentNumber": 65,
"unit": 1,
"itemIdentifier": "E",
"amount": 7.49,
"itemUnitPriceAmount": 7.49,
},
)
self.assertEqual("60357", row["retailer_item_id"])
self.assertEqual("MIXED PEPPER", row["item_name_norm"])
self.assertEqual("6", row["pack_qty"])
self.assertEqual("count", row["measure_type"])
discount = enrich_costco.parse_costco_item(
order_id="abc",
order_date="2026-03-12",
raw_path=Path("costco_output/raw/abc.json"),
line_no=2,
item={
"itemNumber": "374664",
"itemDescription01": "/ 4873222",
"itemDescription02": None,
"itemDepartmentNumber": 14,
"transDepartmentNumber": 14,
"unit": -1,
"itemIdentifier": None,
"amount": -5,
"itemUnitPriceAmount": 0,
},
)
self.assertEqual("true", discount["is_discount_line"])
self.assertEqual("true", discount["is_coupon_line"])
def test_cross_retailer_validation_writes_proof_example(self):
with tempfile.TemporaryDirectory() as tmpdir:
giant_csv = Path(tmpdir) / "giant_items_enriched.csv"
costco_csv = Path(tmpdir) / "costco_items_enriched.csv"
outdir = Path(tmpdir) / "combined"
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"order_date": "2026-03-01",
"retailer_item_id": "100",
"item_name": "FRESH BANANA",
"item_name_norm": "BANANA",
"upc": "4011",
"measure_type": "weight",
"is_store_brand": "false",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"line_total": "1.29",
}
)
costco_row = {field: "" for field in fieldnames}
costco_row.update(
{
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"order_date": "2026-03-12",
"retailer_item_id": "30669",
"item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA",
"upc": "",
"size_value": "3",
"size_unit": "lb",
"measure_type": "weight",
"is_store_brand": "false",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"line_total": "2.98",
}
)
with giant_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(giant_row)
with costco_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(costco_row)
validate_cross_retailer_flow.main.callback(
giant_items_enriched_csv=str(giant_csv),
costco_items_enriched_csv=str(costco_csv),
outdir=str(outdir),
)
proof_path = outdir / "proof_examples.csv"
self.assertTrue(proof_path.exists())
with proof_path.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual(1, len(rows))
self.assertEqual("banana", rows[0]["proof_name"])
def test_main_writes_summary_request_metadata(self):
with tempfile.TemporaryDirectory() as tmpdir:
outdir = Path(tmpdir) / "costco_output"
summary_payload = {
"data": {
"receiptsWithCounts": {
"inWarehouse": 1,
"gasStation": 0,
"carWash": 0,
"gasAndCarWash": 0,
"receipts": [
{
"transactionBarcode": "abc",
"receiptType": "In-Warehouse",
"tenderArray": [],
"couponArray": [],
}
],
}
}
}
detail_payload = {
"data": {
"receiptsWithCounts": {
"receipts": [
{
"transactionBarcode": "abc",
"transactionDate": "2026-03-12",
"receiptType": "In-Warehouse",
"total": 10.0,
"totalItemCount": 1,
"instantSavings": 0,
"warehouseName": "MT VERNON",
"warehouseNumber": 1115,
"warehouseAddress1": "7940 RICHMOND HWY",
"warehouseCity": "ALEXANDRIA",
"warehouseState": "VA",
"warehousePostalCode": "22306",
"itemArray": [],
}
]
}
}
}
metadata = [
{
"startDate": "1/01/2026",
"endDate": "3/31/2026",
"text": "custom",
"documentType": "all",
"documentSubType": "all",
"returnedReceipts": 1,
"returnedInWarehouseReceipts": 1,
"inWarehouse": 1,
"gasStation": 0,
"carWash": 0,
"gasAndCarWash": 0,
"countMismatch": False,
}
]
with mock.patch.object(
scrape_costco,
"load_config",
return_value={
"authorization": "",
"client_id": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client_identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco,
"find_firefox_profile_dir",
return_value=Path("/tmp/profile"),
), mock.patch.object(
scrape_costco,
"load_costco_browser_headers",
return_value={
"costco-x-authorization": "Bearer header.payload.signature",
"costco-x-wcs-clientId": "4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
"client-identifier": "481b1aec-aa3b-454b-b81b-48187e28f205",
},
), mock.patch.object(
scrape_costco, "build_session", return_value=object()
), mock.patch.object(
scrape_costco,
"fetch_summary_windows",
return_value=(summary_payload, metadata),
), mock.patch.object(
scrape_costco,
"graphql_post",
return_value=detail_payload,
):
scrape_costco.main.callback(
outdir=str(outdir),
document_type="all",
document_sub_type="all",
window_days=92,
months_back=3,
firefox_profile_dir=None,
)
metadata_path = outdir / "raw" / "summary_requests.json"
self.assertTrue(metadata_path.exists())
saved_metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
self.assertEqual(metadata, saved_metadata)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,191 +0,0 @@
import csv
import json
import tempfile
import unittest
from pathlib import Path
import enrich_giant
class EnrichGiantTests(unittest.TestCase):
def test_parse_size_and_pack_handles_pack_and_weight_tokens(self):
size_value, size_unit, pack_qty = enrich_giant.parse_size_and_pack(
"COKE CHERRY 6PK 7.5Z"
)
self.assertEqual("7.5", size_value)
self.assertEqual("oz", size_unit)
self.assertEqual("6", pack_qty)
def test_parse_item_marks_store_brand_fee_and_weight_prices(self):
row = enrich_giant.parse_item(
order_id="abc123",
order_date="2026-03-01",
raw_path=Path("raw/abc123.json"),
line_no=1,
item={
"podId": 1,
"shipQy": 1,
"totalPickedWeight": 2,
"unitPrice": 3.98,
"itemName": "+SB GALA APPLE 5 LB",
"lbEachCd": "LB",
"groceryAmount": 3.98,
"primUpcCd": "111",
"mvpSavings": 0,
"rewardSavings": 0,
"couponSavings": 0,
"couponPrice": 0,
"categoryId": "1",
"categoryDesc": "Grocery",
"image": {"large": "https://example.test/apple.jpg"},
},
)
self.assertEqual("SB", row["brand_guess"])
self.assertEqual("GALA APPLE", row["item_name_norm"])
self.assertEqual("5", row["size_value"])
self.assertEqual("lb", row["size_unit"])
self.assertEqual("weight", row["measure_type"])
self.assertEqual("true", row["is_store_brand"])
self.assertEqual("1.99", row["price_per_lb"])
self.assertEqual("0.1244", row["price_per_oz"])
self.assertEqual("https://example.test/apple.jpg", row["image_url"])
fee_row = enrich_giant.parse_item(
order_id="abc123",
order_date="2026-03-01",
raw_path=Path("raw/abc123.json"),
line_no=2,
item={
"podId": 2,
"shipQy": 1,
"totalPickedWeight": 0,
"unitPrice": 0.05,
"itemName": "GL BAG CHARGE",
"lbEachCd": "EA",
"groceryAmount": 0.05,
"primUpcCd": "",
"mvpSavings": 0,
"rewardSavings": 0,
"couponSavings": 0,
"couponPrice": 0,
"categoryId": "1",
"categoryDesc": "Grocery",
},
)
self.assertEqual("true", fee_row["is_fee"])
self.assertEqual("GL BAG CHARGE", fee_row["item_name_norm"])
def test_parse_item_derives_packaged_weight_prices_from_size_tokens(self):
row = enrich_giant.parse_item(
order_id="abc123",
order_date="2026-03-01",
raw_path=Path("raw/abc123.json"),
line_no=1,
item={
"podId": 1,
"shipQy": 2,
"totalPickedWeight": 0,
"unitPrice": 3.0,
"itemName": "PEPSI 6PK 7.5Z",
"lbEachCd": "EA",
"groceryAmount": 6.0,
"primUpcCd": "111",
"mvpSavings": 0,
"rewardSavings": 0,
"couponSavings": 0,
"couponPrice": 0,
"categoryId": "1",
"categoryDesc": "Grocery",
},
)
self.assertEqual("weight", row["measure_type"])
self.assertEqual("6", row["pack_qty"])
self.assertEqual("7.5", row["size_value"])
self.assertEqual("0.0667", row["price_per_oz"])
self.assertEqual("1.0667", row["price_per_lb"])
def test_build_items_enriched_reads_raw_order_files_and_writes_csv(self):
with tempfile.TemporaryDirectory() as tmpdir:
raw_dir = Path(tmpdir) / "raw"
raw_dir.mkdir()
(raw_dir / "history.json").write_text("{}", encoding="utf-8")
(raw_dir / "order-2.json").write_text(
json.dumps(
{
"orderId": "order-2",
"orderDate": "2026-03-02",
"items": [
{
"podId": 20,
"shipQy": 1,
"totalPickedWeight": 0,
"unitPrice": 2.99,
"itemName": "SB ROTINI 16Z",
"lbEachCd": "EA",
"groceryAmount": 2.99,
"primUpcCd": "222",
"mvpSavings": 0,
"rewardSavings": 0,
"couponSavings": 0,
"couponPrice": 0,
"categoryId": "1",
"categoryDesc": "Grocery",
"image": {"small": "https://example.test/rotini.jpg"},
}
],
}
),
encoding="utf-8",
)
(raw_dir / "order-1.json").write_text(
json.dumps(
{
"orderId": "order-1",
"orderDate": "2026-03-01",
"items": [
{
"podId": 10,
"shipQy": 2,
"totalPickedWeight": 0,
"unitPrice": 1.5,
"itemName": "PEPSI 6PK 7.5Z",
"lbEachCd": "EA",
"groceryAmount": 3.0,
"primUpcCd": "111",
"mvpSavings": 0,
"rewardSavings": 0,
"couponSavings": 0,
"couponPrice": 0,
"categoryId": "1",
"categoryDesc": "Grocery",
}
],
}
),
encoding="utf-8",
)
rows = enrich_giant.build_items_enriched(raw_dir)
output_csv = Path(tmpdir) / "items_enriched.csv"
enrich_giant.write_csv(output_csv, rows)
self.assertEqual(["order-1", "order-2"], [row["order_id"] for row in rows])
self.assertEqual("PEPSI", rows[0]["item_name_norm"])
self.assertEqual("6", rows[0]["pack_qty"])
self.assertEqual("7.5", rows[0]["size_value"])
self.assertEqual("10", rows[0]["retailer_item_id"])
self.assertEqual("true", rows[1]["is_store_brand"])
with output_csv.open(newline="", encoding="utf-8") as handle:
written_rows = list(csv.DictReader(handle))
self.assertEqual(2, len(written_rows))
self.assertEqual(enrich_giant.OUTPUT_FIELDS, list(written_rows[0].keys()))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,17 +1,66 @@
import unittest
import requests
from playwright.sync_api import sync_playwright
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
USER_ID = "369513017"
LOYALTY = "440155630880"
try:
from playwright.sync_api import sync_playwright # noqa: F401
import requests # noqa: F401
except ImportError as exc: # pragma: no cover - dependency-gated smoke test
sync_playwright = None
_IMPORT_ERROR = exc
else:
_IMPORT_ERROR = None
def get_session():
with sync_playwright() as p:
browser = p.firefox.launch(headless=False)
page = browser.new_page()
page.goto(ACCOUNT_PAGE)
print("log in manually in the browser, then press ENTER here")
input()
cookies = page.context.cookies()
ua = page.evaluate("() => navigator.userAgent")
browser.close()
s = requests.Session()
s.headers.update({
"user-agent": ua,
"accept": "application/json, text/plain, */*",
"referer": ACCOUNT_PAGE,
})
for c in cookies:
domain = c.get("domain", "").lstrip(".") or "giantfood.com"
s.cookies.set(c["name"], c["value"], domain=domain)
return s
@unittest.skipIf(sync_playwright is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}")
class GiantLoginSmokeTest(unittest.TestCase):
def test_dependencies_available(self):
self.assertIsNotNone(sync_playwright)
def test_history(session):
url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history"
r = session.get(
url,
params={
"filter": "instore",
"loyaltyNumber": LOYALTY,
},
)
print("status:", r.status_code)
print()
data = r.json()
print("orders found:", len(data.get("records", [])))
print()
for rec in data.get("records", [])[:5]:
print(rec["orderId"], rec["orderDate"], rec["orderTotal"])
if __name__ == "__main__":
session = get_session()
test_history(session)

View File

@@ -1,67 +0,0 @@
import unittest
import build_observed_products
class ObservedProductTests(unittest.TestCase):
def test_build_observed_products_aggregates_rows_with_same_key(self):
rows = [
{
"retailer": "giant",
"order_id": "1",
"line_no": "1",
"order_date": "2026-01-01",
"item_name": "SB GALA APPLE 5LB",
"item_name_norm": "GALA APPLE",
"retailer_item_id": "11",
"upc": "111",
"brand_guess": "SB",
"variant": "",
"size_value": "5",
"size_unit": "lb",
"pack_qty": "",
"measure_type": "weight",
"image_url": "https://example.test/a.jpg",
"is_store_brand": "true",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"line_total": "7.99",
},
{
"retailer": "giant",
"order_id": "2",
"line_no": "1",
"order_date": "2026-01-10",
"item_name": "SB GALA APPLE 5 LB",
"item_name_norm": "GALA APPLE",
"retailer_item_id": "11",
"upc": "111",
"brand_guess": "SB",
"variant": "",
"size_value": "5",
"size_unit": "lb",
"pack_qty": "",
"measure_type": "weight",
"image_url": "",
"is_store_brand": "true",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"line_total": "8.49",
},
]
observed = build_observed_products.build_observed_products(rows)
self.assertEqual(1, len(observed))
self.assertEqual("2", observed[0]["times_seen"])
self.assertEqual("2026-01-01", observed[0]["first_seen_date"])
self.assertEqual("2026-01-10", observed[0]["last_seen_date"])
self.assertEqual("11", observed[0]["representative_retailer_item_id"])
self.assertEqual("111", observed[0]["representative_upc"])
self.assertIn("SB GALA APPLE 5LB", observed[0]["raw_name_examples"])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,133 +0,0 @@
import tempfile
import unittest
from pathlib import Path
import build_observed_products
import build_review_queue
from layer_helpers import write_csv_rows
class ReviewQueueTests(unittest.TestCase):
def test_build_review_queue_preserves_existing_status(self):
observed_rows = [
{
"observed_product_id": "gobs_1",
"retailer": "giant",
"representative_upc": "111",
"representative_image_url": "",
"representative_name_norm": "GALA APPLE",
"times_seen": "2",
"distinct_item_names_count": "2",
"distinct_upcs_count": "1",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
}
]
item_rows = [
{
"observed_product_id": "gobs_1",
"item_name": "SB GALA APPLE 5LB",
"item_name_norm": "GALA APPLE",
"line_total": "7.99",
},
{
"observed_product_id": "gobs_1",
"item_name": "SB GALA APPLE 5 LB",
"item_name_norm": "GALA APPLE",
"line_total": "8.49",
},
]
existing = {
build_review_queue.stable_id("rvw", "gobs_1|missing_image"): {
"status": "approved",
"resolution_notes": "looked fine",
"created_at": "2026-03-15",
}
}
queue = build_review_queue.build_review_queue(
observed_rows, item_rows, existing, "2026-03-16"
)
self.assertEqual(2, len(queue))
missing_image = [row for row in queue if row["reason_code"] == "missing_image"][0]
self.assertEqual("approved", missing_image["status"])
self.assertEqual("looked fine", missing_image["resolution_notes"])
def test_review_queue_main_writes_output(self):
with tempfile.TemporaryDirectory() as tmpdir:
observed_path = Path(tmpdir) / "products_observed.csv"
items_path = Path(tmpdir) / "items_enriched.csv"
output_path = Path(tmpdir) / "review_queue.csv"
observed_rows = [
{
"observed_product_id": "gobs_1",
"retailer": "giant",
"observed_key": "giant|upc=111|name=GALA APPLE",
"representative_retailer_item_id": "11",
"representative_upc": "111",
"representative_item_name": "SB GALA APPLE 5LB",
"representative_name_norm": "GALA APPLE",
"representative_brand": "SB",
"representative_variant": "",
"representative_size_value": "5",
"representative_size_unit": "lb",
"representative_pack_qty": "",
"representative_measure_type": "weight",
"representative_image_url": "",
"is_store_brand": "true",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"first_seen_date": "2026-01-01",
"last_seen_date": "2026-01-10",
"times_seen": "2",
"example_order_id": "1",
"example_item_name": "SB GALA APPLE 5LB",
"raw_name_examples": "SB GALA APPLE 5LB | SB GALA APPLE 5 LB",
"normalized_name_examples": "GALA APPLE",
"example_prices": "7.99 | 8.49",
"distinct_item_names_count": "2",
"distinct_retailer_item_ids_count": "1",
"distinct_upcs_count": "1",
}
]
item_rows = [
{
"retailer": "giant",
"order_id": "1",
"line_no": "1",
"item_name": "SB GALA APPLE 5LB",
"item_name_norm": "GALA APPLE",
"retailer_item_id": "11",
"upc": "111",
"size_value": "5",
"size_unit": "lb",
"pack_qty": "",
"measure_type": "weight",
"is_store_brand": "true",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
"line_total": "7.99",
}
]
write_csv_rows(
observed_path, observed_rows, build_observed_products.OUTPUT_FIELDS
)
write_csv_rows(items_path, item_rows, list(item_rows[0].keys()))
build_review_queue.main.callback(
observed_csv=str(observed_path),
items_enriched_csv=str(items_path),
output_csv=str(output_path),
)
self.assertTrue(output_path.exists())
if __name__ == "__main__":
unittest.main()

View File

@@ -1,117 +0,0 @@
import csv
import tempfile
import unittest
from pathlib import Path
import scraper
class ScraperTests(unittest.TestCase):
def test_flatten_orders_extracts_order_and_item_rows(self):
history = {
"records": [
{
"orderId": "abc123",
"serviceType": "PICKUP",
}
]
}
details = [
{
"orderId": "abc123",
"orderDate": "2026-03-01",
"deliveryDate": "2026-03-02",
"orderTotal": "12.34",
"paymentMethod": "VISA",
"totalItemCount": 1,
"totalSavings": "1.00",
"yourSavingsTotal": "1.00",
"couponsDiscountsTotal": "0.50",
"refundOrder": False,
"ebtOrder": False,
"pup": {
"storeName": "Giant",
"aholdStoreNumber": "42",
"storeAddress1": "123 Main",
"storeCity": "Springfield",
"storeState": "VA",
"storeZipcode": "22150",
},
"items": [
{
"podId": "pod-1",
"itemName": "Bananas",
"primUpcCd": "111",
"categoryId": "produce",
"categoryDesc": "Produce",
"shipQy": "2",
"lbEachCd": "EA",
"unitPrice": "0.59",
"groceryAmount": "1.18",
"totalPickedWeight": "",
"mvpSavings": "0.10",
"rewardSavings": "0.00",
"couponSavings": "0.00",
"couponPrice": "",
}
],
}
]
orders, items = scraper.flatten_orders(history, details)
self.assertEqual(1, len(orders))
self.assertEqual("abc123", orders[0]["order_id"])
self.assertEqual("PICKUP", orders[0]["service_type"])
self.assertEqual(1, len(items))
self.assertEqual("1", items[0]["line_no"])
self.assertEqual("Bananas", items[0]["item_name"])
def test_append_dedup_replaces_duplicate_rows_and_preserves_new_values(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "orders.csv"
scraper.append_dedup(
path,
[
{"order_id": "1", "order_total": "10.00"},
{"order_id": "2", "order_total": "20.00"},
],
subset=["order_id"],
fieldnames=["order_id", "order_total"],
)
merged = scraper.append_dedup(
path,
[
{"order_id": "2", "order_total": "21.50"},
{"order_id": "3", "order_total": "30.00"},
],
subset=["order_id"],
fieldnames=["order_id", "order_total"],
)
self.assertEqual(
[
{"order_id": "1", "order_total": "10.00"},
{"order_id": "2", "order_total": "21.50"},
{"order_id": "3", "order_total": "30.00"},
],
merged,
)
with path.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual(merged, rows)
def test_read_existing_order_ids_returns_known_ids(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "orders.csv"
path.write_text("order_id,order_total\n1,10.00\n2,20.00\n", encoding="utf-8")
self.assertEqual({"1", "2"}, scraper.read_existing_order_ids(path))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,154 +0,0 @@
import json
from pathlib import Path
import click
import build_canonical_layer
import build_observed_products
from layer_helpers import stable_id, write_csv_rows
PROOF_FIELDS = [
"proof_name",
"canonical_product_id",
"giant_observed_product_id",
"costco_observed_product_id",
"giant_example_item",
"costco_example_item",
"notes",
]
def read_rows(path):
import csv
with Path(path).open(newline="", encoding="utf-8") as handle:
return list(csv.DictReader(handle))
def find_proof_pair(observed_rows):
giant = None
costco = None
for row in observed_rows:
if row["retailer"] == "giant" and row["representative_name_norm"] == "BANANA":
giant = row
if row["retailer"] == "costco" and row["representative_name_norm"] == "BANANA":
costco = row
return giant, costco
def merge_proof_pair(canonical_rows, link_rows, giant_row, costco_row):
if not giant_row or not costco_row:
return canonical_rows, link_rows, []
proof_canonical_id = stable_id("gcan", "proof|banana")
link_rows = [
row
for row in link_rows
if row["observed_product_id"]
not in {giant_row["observed_product_id"], costco_row["observed_product_id"]}
]
canonical_rows = [
row
for row in canonical_rows
if row["canonical_product_id"] != proof_canonical_id
]
canonical_rows.append(
{
"canonical_product_id": proof_canonical_id,
"canonical_name": "BANANA",
"product_type": "banana",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "weight",
"normalized_quantity": "",
"normalized_quantity_unit": "",
"notes": "manual proof merge for cross-retailer validation",
"created_at": "",
"updated_at": "",
}
)
for observed_row in [giant_row, costco_row]:
link_rows.append(
{
"observed_product_id": observed_row["observed_product_id"],
"canonical_product_id": proof_canonical_id,
"link_method": "manual_proof_merge",
"link_confidence": "medium",
"review_status": "",
"reviewed_by": "",
"reviewed_at": "",
"link_notes": "cross-retailer validation proof",
}
)
proof_rows = [
{
"proof_name": "banana",
"canonical_product_id": proof_canonical_id,
"giant_observed_product_id": giant_row["observed_product_id"],
"costco_observed_product_id": costco_row["observed_product_id"],
"giant_example_item": giant_row["example_item_name"],
"costco_example_item": costco_row["example_item_name"],
"notes": "BANANA proof pair built from Giant and Costco enriched rows",
}
]
return canonical_rows, link_rows, proof_rows
@click.command()
@click.option(
"--giant-items-enriched-csv",
default="giant_output/items_enriched.csv",
show_default=True,
)
@click.option(
"--costco-items-enriched-csv",
default="costco_output/items_enriched.csv",
show_default=True,
)
@click.option(
"--outdir",
default="combined_output",
show_default=True,
)
def main(giant_items_enriched_csv, costco_items_enriched_csv, outdir):
outdir = Path(outdir)
rows = read_rows(giant_items_enriched_csv) + read_rows(costco_items_enriched_csv)
observed_rows = build_observed_products.build_observed_products(rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = find_proof_pair(observed_rows)
if not giant_row or not costco_row:
raise click.ClickException(
"could not find BANANA proof pair across Giant and Costco observed products"
)
canonical_rows, link_rows, proof_rows = merge_proof_pair(
canonical_rows, link_rows, giant_row, costco_row
)
write_csv_rows(
outdir / "products_observed.csv",
observed_rows,
build_observed_products.OUTPUT_FIELDS,
)
write_csv_rows(
outdir / "products_canonical.csv",
canonical_rows,
build_canonical_layer.CANONICAL_FIELDS,
)
write_csv_rows(
outdir / "product_links.csv",
link_rows,
build_canonical_layer.LINK_FIELDS,
)
write_csv_rows(outdir / "proof_examples.csv", proof_rows, PROOF_FIELDS)
click.echo(
f"wrote combined outputs to {outdir} using {len(observed_rows)} observed rows"
)
if __name__ == "__main__":
main()