Harden giant receipt fetch CLI

This commit is contained in:
ben
2026-03-14 18:32:32 -04:00
parent 585d8c1e49
commit d57b9cf52f
8 changed files with 456 additions and 470 deletions

View File

@@ -1,29 +1,88 @@
import csv
import json
import os
import time
from pathlib import Path
import browser_cookie3
import pandas as pd
from curl_cffi import requests
import click
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
USER_ID = "369513017"
LOYALTY = "440155630880"
ORDER_FIELDS = [
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
]
ITEM_FIELDS = [
"order_id",
"order_date",
"line_no",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
]
def load_config():
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
if load_dotenv is not None:
load_dotenv()
return {
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
}
def build_session():
s = requests.Session()
s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
s.headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
})
return s
session = requests.Session()
session.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
session.headers.update(
{
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
}
)
return session
def safe_get(session, url, **kwargs):
@@ -31,20 +90,20 @@ def safe_get(session, url, **kwargs):
for attempt in range(3):
try:
r = session.get(
response = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = r
last_response = response
if r.status_code == 200:
return r
if response.status_code == 200:
return response
print(f"retry {attempt + 1}/3 status={r.status_code}")
except Exception as e:
print(f"retry {attempt + 1}/3 error={e}")
click.echo(f"retry {attempt + 1}/3 status={response.status_code}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
@@ -54,128 +113,234 @@ def safe_get(session, url, **kwargs):
raise RuntimeError(f"failed to fetch {url}")
def get_history(session):
url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history"
r = safe_get(
def get_history(session, user_id, loyalty):
response = safe_get(
session,
url,
params={
"filter": "instore",
"loyaltyNumber": LOYALTY,
},
f"{BASE}/api/v6.0/user/{user_id}/order/history",
params={"filter": "instore", "loyaltyNumber": loyalty},
)
return r.json()
return response.json()
def get_order_detail(session, order_id):
url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history/detail/{order_id}"
r = safe_get(
def get_order_detail(session, user_id, order_id):
response = safe_get(
session,
url,
f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}",
params={"isInStore": "true"},
)
return r.json()
return response.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {record["orderId"]: record for record in history.get("records", [])}
history_lookup = {
r["orderId"]: r
for r in history.get("records", [])
}
for detail in details:
order_id = str(detail["orderId"])
history_row = history_lookup.get(detail["orderId"], {})
pickup = detail.get("pup", {})
for d in details:
hist = history_lookup.get(d["orderId"], {})
pup = d.get("pup", {})
orders.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"delivery_date": detail.get("deliveryDate"),
"service_type": history_row.get("serviceType"),
"order_total": detail.get("orderTotal"),
"payment_method": detail.get("paymentMethod"),
"total_item_count": detail.get("totalItemCount"),
"total_savings": detail.get("totalSavings"),
"your_savings_total": detail.get("yourSavingsTotal"),
"coupons_discounts_total": detail.get("couponsDiscountsTotal"),
"store_name": pickup.get("storeName"),
"store_number": pickup.get("aholdStoreNumber"),
"store_address1": pickup.get("storeAddress1"),
"store_city": pickup.get("storeCity"),
"store_state": pickup.get("storeState"),
"store_zipcode": pickup.get("storeZipcode"),
"refund_order": detail.get("refundOrder"),
"ebt_order": detail.get("ebtOrder"),
}
)
orders.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"delivery_date": d.get("deliveryDate"),
"service_type": hist.get("serviceType"),
"order_total": d.get("orderTotal"),
"payment_method": d.get("paymentMethod"),
"total_item_count": d.get("totalItemCount"),
"total_savings": d.get("totalSavings"),
"your_savings_total": d.get("yourSavingsTotal"),
"coupons_discounts_total": d.get("couponsDiscountsTotal"),
"store_name": pup.get("storeName"),
"store_number": pup.get("aholdStoreNumber"),
"store_address1": pup.get("storeAddress1"),
"store_city": pup.get("storeCity"),
"store_state": pup.get("storeState"),
"store_zipcode": pup.get("storeZipcode"),
"refund_order": d.get("refundOrder"),
"ebt_order": d.get("ebtOrder"),
})
for line_no, item in enumerate(detail.get("items", []), start=1):
items.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"line_no": str(line_no),
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
}
)
for i, item in enumerate(d.get("items", []), start=1):
items.append({
"order_id": d["orderId"],
"order_date": d.get("orderDate"),
"line_no": i,
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
})
return pd.DataFrame(orders), pd.DataFrame(items)
return orders, items
def main():
outdir = Path("giant_output")
def normalize_row(row, fieldnames):
return {field: stringify(row.get(field)) for field in fieldnames}
def stringify(value):
if value is None:
return ""
return str(value)
def read_csv_rows(path):
if not path.exists():
return [], []
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
fieldnames = reader.fieldnames or []
return fieldnames, list(reader)
def read_existing_order_ids(path):
_, rows = read_csv_rows(path)
return {row["order_id"] for row in rows if row.get("order_id")}
def merge_rows(existing_rows, new_rows, subset):
merged = []
row_index = {}
for row in existing_rows + new_rows:
key = tuple(stringify(row.get(field)) for field in subset)
normalized = dict(row)
if key in row_index:
merged[row_index[key]] = normalized
else:
row_index[key] = len(merged)
merged.append(normalized)
return merged
def append_dedup(path, new_rows, subset, fieldnames):
existing_fieldnames, existing_rows = read_csv_rows(path)
all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames))
merged = merge_rows(
[normalize_row(row, all_fieldnames) for row in existing_rows],
[normalize_row(row, all_fieldnames) for row in new_rows],
subset=subset,
)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=all_fieldnames)
writer.writeheader()
writer.writerows(merged)
return merged
def write_json(path, payload):
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
@click.command()
@click.option("--user-id", default=None, help="Giant user id.")
@click.option("--loyalty", default=None, help="Giant loyalty number.")
@click.option(
"--outdir",
default="giant_output",
show_default=True,
help="Directory for raw json and csv outputs.",
)
@click.option(
"--sleep-seconds",
default=1.5,
show_default=True,
type=float,
help="Delay between order detail requests.",
)
def main(user_id, loyalty, outdir, sleep_seconds):
config = load_config()
user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str)
loyalty = loyalty or config["loyalty"] or click.prompt(
"Giant loyalty number", type=str
)
outdir = Path(outdir)
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
orders_csv = outdir / "orders.csv"
items_csv = outdir / "items.csv"
click.echo("Using cookies from your current Firefox profile.")
click.echo(f"Open Giant here, confirm you're logged in, then return: {ACCOUNT_PAGE}")
click.pause(info="Press any key once Giant is open and logged in")
session = build_session()
print("fetching order history...")
history = get_history(session)
click.echo("Fetching order history...")
history = get_history(session, user_id, loyalty)
write_json(rawdir / "history.json", history)
(rawdir / "history.json").write_text(
json.dumps(history, indent=2),
encoding="utf-8",
records = history.get("records", [])
click.echo(f"History returned {len(records)} visits.")
click.echo(
"Note: Giant appears to expose only the most recent 50 visits, "
"so run this periodically if you want full continuity."
)
order_ids = [r["orderId"] for r in history.get("records", [])]
print(f"{len(order_ids)} orders found")
history_order_ids = [str(record["orderId"]) for record in records]
existing_order_ids = read_existing_order_ids(orders_csv)
new_order_ids = [order_id for order_id in history_order_ids if order_id not in existing_order_ids]
click.echo(f"Existing orders in csv: {len(existing_order_ids)}")
click.echo(f"New orders to fetch: {len(new_order_ids)}")
if not new_order_ids:
click.echo("No new orders found. Done.")
return
details = []
for order_id in order_ids:
print(f"fetching {order_id}")
d = get_order_detail(session, order_id)
details.append(d)
for order_id in new_order_ids:
click.echo(f"Fetching {order_id}")
detail = get_order_detail(session, user_id, order_id)
details.append(detail)
write_json(rawdir / f"{order_id}.json", detail)
time.sleep(sleep_seconds)
(rawdir / f"{order_id}.json").write_text(
json.dumps(d, indent=2),
encoding="utf-8",
)
click.echo("Flattening new data...")
orders, items = flatten_orders(history, details)
time.sleep(1.5)
all_orders = append_dedup(
orders_csv,
orders,
subset=["order_id"],
fieldnames=ORDER_FIELDS,
)
all_items = append_dedup(
items_csv,
items,
subset=["order_id", "line_no", "item_name", "upc", "line_total"],
fieldnames=ITEM_FIELDS,
)
print("flattening data...")
orders_df, items_df = flatten_orders(history, details)
orders_df.to_csv(outdir / "orders.csv", index=False)
items_df.to_csv(outdir / "items.csv", index=False)
print("done")
print(f"{len(orders_df)} orders written to {outdir / 'orders.csv'}")
print(f"{len(items_df)} items written to {outdir / 'items.csv'}")
click.echo("Done.")
click.echo(f"Orders csv: {orders_csv}")
click.echo(f"Items csv: {items_csv}")
click.echo(f"Total orders stored: {len(all_orders)}")
click.echo(f"Total item rows stored: {len(all_items)}")
if __name__ == "__main__":
main()
main()