import json import time from pathlib import Path import browser_cookie3 import click import pandas as pd from curl_cffi import requests from dotenv import load_dotenv import os BASE = "https://giantfood.com" ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" def load_config(): load_dotenv() return { "user_id": os.getenv("GIANT_USER_ID", "").strip(), "loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(), } def build_session(): s = requests.Session() s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) s.headers.update({ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0", "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9", "referer": ACCOUNT_PAGE, }) return s def safe_get(session, url, **kwargs): last_response = None for attempt in range(3): try: r = session.get( url, impersonate="firefox", timeout=30, **kwargs, ) last_response = r if r.status_code == 200: return r click.echo(f"retry {attempt + 1}/3 status={r.status_code}") except Exception as e: click.echo(f"retry {attempt + 1}/3 error={e}") time.sleep(3) if last_response is not None: last_response.raise_for_status() raise RuntimeError(f"failed to fetch {url}") def get_history(session, user_id, loyalty): url = f"{BASE}/api/v6.0/user/{user_id}/order/history" r = safe_get( session, url, params={ "filter": "instore", "loyaltyNumber": loyalty, }, ) return r.json() def get_order_detail(session, user_id, order_id): url = f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}" r = safe_get( session, url, params={"isInStore": "true"}, ) return r.json() def flatten_orders(history, details): orders = [] items = [] history_lookup = { r["orderId"]: r for r in history.get("records", []) } for d in details: hist = history_lookup.get(d["orderId"], {}) pup = d.get("pup", {}) orders.append({ "order_id": d["orderId"], "order_date": d.get("orderDate"), "delivery_date": d.get("deliveryDate"), "service_type": hist.get("serviceType"), "order_total": d.get("orderTotal"), "payment_method": d.get("paymentMethod"), "total_item_count": d.get("totalItemCount"), "total_savings": d.get("totalSavings"), "your_savings_total": d.get("yourSavingsTotal"), "coupons_discounts_total": d.get("couponsDiscountsTotal"), "store_name": pup.get("storeName"), "store_number": pup.get("aholdStoreNumber"), "store_address1": pup.get("storeAddress1"), "store_city": pup.get("storeCity"), "store_state": pup.get("storeState"), "store_zipcode": pup.get("storeZipcode"), "refund_order": d.get("refundOrder"), "ebt_order": d.get("ebtOrder"), }) for i, item in enumerate(d.get("items", []), start=1): items.append({ "order_id": d["orderId"], "order_date": d.get("orderDate"), "line_no": i, "pod_id": item.get("podId"), "item_name": item.get("itemName"), "upc": item.get("primUpcCd"), "category_id": item.get("categoryId"), "category": item.get("categoryDesc"), "qty": item.get("shipQy"), "unit": item.get("lbEachCd"), "unit_price": item.get("unitPrice"), "line_total": item.get("groceryAmount"), "picked_weight": item.get("totalPickedWeight"), "mvp_savings": item.get("mvpSavings"), "reward_savings": item.get("rewardSavings"), "coupon_savings": item.get("couponSavings"), "coupon_price": item.get("couponPrice"), }) return pd.DataFrame(orders), pd.DataFrame(items) def read_existing_order_ids(orders_csv: Path) -> set[str]: if not orders_csv.exists(): return set() try: df = pd.read_csv(orders_csv, dtype={"order_id": str}) if "order_id" not in df.columns: return set() return set(df["order_id"].dropna().astype(str)) except Exception: return set() def append_dedup(existing_path: Path, new_df: pd.DataFrame, subset: list[str]) -> pd.DataFrame: if existing_path.exists(): old_df = pd.read_csv(existing_path, dtype=str) combined = pd.concat([old_df, new_df.astype(str)], ignore_index=True) else: combined = new_df.astype(str).copy() combined = combined.drop_duplicates(subset=subset, keep="last") combined.to_csv(existing_path, index=False) return combined @click.command() @click.option("--user-id", default=None, help="giant user id") @click.option("--loyalty", default=None, help="giant loyalty number") @click.option("--outdir", default="giant_output", show_default=True, help="output directory") @click.option("--sleep-seconds", default=1.5, show_default=True, type=float, help="delay between detail requests") def main(user_id, loyalty, outdir, sleep_seconds): cfg = load_config() user_id = user_id or cfg["user_id"] or click.prompt("giant user id", type=str) loyalty = loyalty or cfg["loyalty"] or click.prompt("giant loyalty number", type=str) outdir = Path(outdir) rawdir = outdir / "raw" rawdir.mkdir(parents=True, exist_ok=True) orders_csv = outdir / "orders.csv" items_csv = outdir / "items.csv" click.echo("using cookies from your current firefox profile.") click.echo(f"open giant here, make sure you're logged in, then return: {ACCOUNT_PAGE}") click.pause(info="press any key once giant is open and logged in") session = build_session() click.echo("fetching order history...") history = get_history(session, user_id, loyalty) (rawdir / "history.json").write_text( json.dumps(history, indent=2), encoding="utf-8", ) records = history.get("records", []) click.echo(f"history returned {len(records)} visits") click.echo("tip: giant appears to expose only the most recent 50 visits, so run this periodically if you want full continuity.") history_order_ids = [str(r["orderId"]) for r in records] existing_order_ids = read_existing_order_ids(orders_csv) new_order_ids = [oid for oid in history_order_ids if oid not in existing_order_ids] click.echo(f"existing orders in csv: {len(existing_order_ids)}") click.echo(f"new orders to fetch: {len(new_order_ids)}") if not new_order_ids: click.echo("no new orders found. done.") return details = [] for order_id in new_order_ids: click.echo(f"fetching {order_id}") d = get_order_detail(session, user_id, order_id) details.append(d) (rawdir / f"{order_id}.json").write_text( json.dumps(d, indent=2), encoding="utf-8", ) time.sleep(sleep_seconds) click.echo("flattening new data...") orders_df, items_df = flatten_orders(history, details) orders_all = append_dedup( orders_csv, orders_df, subset=["order_id"], ) items_all = append_dedup( items_csv, items_df, subset=["order_id", "line_no", "item_name", "upc", "line_total"], ) click.echo("done") click.echo(f"orders csv: {orders_csv}") click.echo(f"items csv: {items_csv}") click.echo(f"total orders stored: {len(orders_all)}") click.echo(f"total item rows stored: {len(items_all)}") if __name__ == "__main__": main()