import csv import json import os import time from pathlib import Path from dotenv import load_dotenv import browser_cookie3 from curl_cffi import requests import click BASE = "https://giantfood.com" ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" ORDER_FIELDS = [ "order_id", "order_date", "delivery_date", "service_type", "order_total", "payment_method", "total_item_count", "total_savings", "your_savings_total", "coupons_discounts_total", "store_name", "store_number", "store_address1", "store_city", "store_state", "store_zipcode", "refund_order", "ebt_order", ] ITEM_FIELDS = [ "order_id", "order_date", "line_no", "pod_id", "item_name", "upc", "category_id", "category", "qty", "unit", "unit_price", "line_total", "picked_weight", "mvp_savings", "reward_savings", "coupon_savings", "coupon_price", ] def load_config(): if load_dotenv is not None: load_dotenv() return { "user_id": os.getenv("GIANT_USER_ID", "").strip(), "loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(), } def build_session(): session = requests.Session() session.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) session.headers.update( { "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " "Gecko/20100101 Firefox/148.0" ), "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9", "referer": ACCOUNT_PAGE, } ) return session def safe_get(session, url, **kwargs): last_response = None for attempt in range(3): try: response = session.get( url, impersonate="firefox", timeout=30, **kwargs, ) last_response = response if response.status_code == 200: return response click.echo(f"retry {attempt + 1}/3 status={response.status_code}") except Exception as exc: # pragma: no cover - network error path click.echo(f"retry {attempt + 1}/3 error={exc}") time.sleep(3) if last_response is not None: last_response.raise_for_status() raise RuntimeError(f"failed to fetch {url}") def get_history(session, user_id, loyalty): response = safe_get( session, f"{BASE}/api/v6.0/user/{user_id}/order/history", params={"filter": "instore", "loyaltyNumber": loyalty}, ) return response.json() def get_order_detail(session, user_id, order_id): response = safe_get( session, f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}", params={"isInStore": "true"}, ) return response.json() def flatten_orders(history, details): orders = [] items = [] history_lookup = {record["orderId"]: record for record in history.get("records", [])} for detail in details: order_id = str(detail["orderId"]) history_row = history_lookup.get(detail["orderId"], {}) pickup = detail.get("pup", {}) orders.append( { "order_id": order_id, "order_date": detail.get("orderDate"), "delivery_date": detail.get("deliveryDate"), "service_type": history_row.get("serviceType"), "order_total": detail.get("orderTotal"), "payment_method": detail.get("paymentMethod"), "total_item_count": detail.get("totalItemCount"), "total_savings": detail.get("totalSavings"), "your_savings_total": detail.get("yourSavingsTotal"), "coupons_discounts_total": detail.get("couponsDiscountsTotal"), "store_name": pickup.get("storeName"), "store_number": pickup.get("aholdStoreNumber"), "store_address1": pickup.get("storeAddress1"), "store_city": pickup.get("storeCity"), "store_state": pickup.get("storeState"), "store_zipcode": pickup.get("storeZipcode"), "refund_order": detail.get("refundOrder"), "ebt_order": detail.get("ebtOrder"), } ) for line_no, item in enumerate(detail.get("items", []), start=1): items.append( { "order_id": order_id, "order_date": detail.get("orderDate"), "line_no": str(line_no), "pod_id": item.get("podId"), "item_name": item.get("itemName"), "upc": item.get("primUpcCd"), "category_id": item.get("categoryId"), "category": item.get("categoryDesc"), "qty": item.get("shipQy"), "unit": item.get("lbEachCd"), "unit_price": item.get("unitPrice"), "line_total": item.get("groceryAmount"), "picked_weight": item.get("totalPickedWeight"), "mvp_savings": item.get("mvpSavings"), "reward_savings": item.get("rewardSavings"), "coupon_savings": item.get("couponSavings"), "coupon_price": item.get("couponPrice"), } ) return orders, items def normalize_row(row, fieldnames): return {field: stringify(row.get(field)) for field in fieldnames} def stringify(value): if value is None: return "" return str(value) def read_csv_rows(path): if not path.exists(): return [], [] with path.open(newline="", encoding="utf-8") as handle: reader = csv.DictReader(handle) fieldnames = reader.fieldnames or [] return fieldnames, list(reader) def read_existing_order_ids(path): _, rows = read_csv_rows(path) return {row["order_id"] for row in rows if row.get("order_id")} def merge_rows(existing_rows, new_rows, subset): merged = [] row_index = {} for row in existing_rows + new_rows: key = tuple(stringify(row.get(field)) for field in subset) normalized = dict(row) if key in row_index: merged[row_index[key]] = normalized else: row_index[key] = len(merged) merged.append(normalized) return merged def append_dedup(path, new_rows, subset, fieldnames): existing_fieldnames, existing_rows = read_csv_rows(path) all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames)) merged = merge_rows( [normalize_row(row, all_fieldnames) for row in existing_rows], [normalize_row(row, all_fieldnames) for row in new_rows], subset=subset, ) with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=all_fieldnames) writer.writeheader() writer.writerows(merged) return merged def write_json(path, payload): path.write_text(json.dumps(payload, indent=2), encoding="utf-8") @click.command() @click.option("--user-id", default=None, help="Giant user id.") @click.option("--loyalty", default=None, help="Giant loyalty number.") @click.option( "--outdir", default="giant_output", show_default=True, help="Directory for raw json and csv outputs.", ) @click.option( "--sleep-seconds", default=1.5, show_default=True, type=float, help="Delay between order detail requests.", ) def main(user_id, loyalty, outdir, sleep_seconds): config = load_config() user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str) loyalty = loyalty or config["loyalty"] or click.prompt( "Giant loyalty number", type=str ) outdir = Path(outdir) rawdir = outdir / "raw" rawdir.mkdir(parents=True, exist_ok=True) orders_csv = outdir / "orders.csv" items_csv = outdir / "items.csv" click.echo("Using cookies from your current Firefox profile.") click.echo(f"Open Giant here, confirm you're logged in, then return: {ACCOUNT_PAGE}") click.pause(info="Press any key once Giant is open and logged in") session = build_session() click.echo("Fetching order history...") history = get_history(session, user_id, loyalty) write_json(rawdir / "history.json", history) records = history.get("records", []) click.echo(f"History returned {len(records)} visits.") click.echo( "Note: Giant appears to expose only the most recent 50 visits, " "so run this periodically if you want full continuity." ) history_order_ids = [str(record["orderId"]) for record in records] existing_order_ids = read_existing_order_ids(orders_csv) new_order_ids = [order_id for order_id in history_order_ids if order_id not in existing_order_ids] click.echo(f"Existing orders in csv: {len(existing_order_ids)}") click.echo(f"New orders to fetch: {len(new_order_ids)}") if not new_order_ids: click.echo("No new orders found. Done.") return details = [] for order_id in new_order_ids: click.echo(f"Fetching {order_id}") detail = get_order_detail(session, user_id, order_id) details.append(detail) write_json(rawdir / f"{order_id}.json", detail) time.sleep(sleep_seconds) click.echo("Flattening new data...") orders, items = flatten_orders(history, details) all_orders = append_dedup( orders_csv, orders, subset=["order_id"], fieldnames=ORDER_FIELDS, ) all_items = append_dedup( items_csv, items, subset=["order_id", "line_no", "item_name", "upc", "line_total"], fieldnames=ITEM_FIELDS, ) click.echo("Done.") click.echo(f"Orders csv: {orders_csv}") click.echo(f"Items csv: {items_csv}") click.echo(f"Total orders stored: {len(all_orders)}") click.echo(f"Total item rows stored: {len(all_items)}") if __name__ == "__main__": main()