import json import time from pathlib import Path import pandas as pd import requests from playwright.sync_api import sync_playwright BASE = "https://giantfood.com" ACCOUNT_HISTORY_URL = f"{BASE}/account/history/invoice/in-store" def build_session_via_playwright(headless=False): with sync_playwright() as p: browser = p.firefox.launch(headless=headless) page = browser.new_page() page.goto(ACCOUNT_HISTORY_URL, wait_until="networkidle") print("log in in the browser window, then press enter here...") input() cookies = page.context.cookies() ua = page.evaluate("() => navigator.userAgent") browser.close() s = requests.Session() s.headers.update({ "user-agent": ua, "accept": "application/json, text/plain, */*", "referer": ACCOUNT_HISTORY_URL, }) for c in cookies: # requests wants host-only-ish handling; stripping leading dot is usually fine domain = c.get("domain", "").lstrip(".") or "giantfood.com" s.cookies.set( c["name"], c["value"], domain=domain, path=c.get("path", "/"), ) return s def get_history(session, user_id, loyalty_number, filter_="instore"): url = f"{BASE}/api/v6.0/user/{user_id}/order/history" params = { "filter": filter_, "loyaltyNumber": loyalty_number, } r = session.get(url, params=params, timeout=30) r.raise_for_status() return r.json() def get_order_detail(session, user_id, order_id, is_instore=True): url = f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}" params = {"isInStore": str(is_instore).lower()} r = session.get(url, params=params, timeout=30) r.raise_for_status() return r.json() def flatten_orders(history_json, details_jsons): orders_rows = [] items_rows = [] history_records = { rec["orderId"]: rec for rec in history_json.get("records", []) } for d in details_jsons: order_id = d["orderId"] hist = history_records.get(order_id, {}) pup = d.get("pup", {}) orders_rows.append({ "order_id": order_id, "order_date": d.get("orderDate"), "delivery_date": d.get("deliveryDate"), "service_type": hist.get("serviceType"), "payment_method": d.get("paymentMethod"), "order_total": d.get("orderTotal"), "total_item_count": d.get("totalItemCount"), "total_savings": d.get("totalSavings"), "your_savings_total": d.get("yourSavingsTotal"), "coupons_discounts_total": d.get("couponsDiscountsTotal"), "store_name": pup.get("storeName"), "store_number": pup.get("aholdStoreNumber"), "store_address1": pup.get("storeAddress1"), "store_city": pup.get("storeCity"), "store_state": pup.get("storeState"), "store_zipcode": pup.get("storeZipcode"), "refund_order": d.get("refundOrder"), "ebt_order": d.get("ebtOrder"), }) for idx, item in enumerate(d.get("items", []), start=1): items_rows.append({ "order_id": order_id, "order_date": d.get("orderDate"), "line_no": idx, "pod_id": item.get("podId"), "upc": item.get("primUpcCd"), "item_name": item.get("itemName"), "category_id": item.get("categoryId"), "category_desc": item.get("categoryDesc"), "qty": item.get("shipQy"), "unit": item.get("lbEachCd"), "unit_price": item.get("unitPrice"), "grocery_amount": item.get("groceryAmount"), "picked_weight": item.get("totalPickedWeight"), "mvp_savings": item.get("mvpSavings"), "reward_savings": item.get("rewardSavings"), "coupon_savings": item.get("couponSavings"), "coupon_price": item.get("couponPrice"), }) return pd.DataFrame(orders_rows), pd.DataFrame(items_rows) def main(): user_id = "369513017" # move to config/env later loyalty_number = "440155630880" # move to config/env later outdir = Path("giant_output") rawdir = outdir / "raw" rawdir.mkdir(parents=True, exist_ok=True) session = build_session_via_playwright(headless=False) history = get_history(session, user_id, loyalty_number) (rawdir / "history.json").write_text(json.dumps(history, indent=2), encoding="utf-8") details = [] for rec in history.get("records", []): order_id = rec["orderId"] print("fetching", order_id) detail = get_order_detail(session, user_id, order_id, is_instore=True) details.append(detail) (rawdir / f"{order_id}.json").write_text(json.dumps(detail, indent=2), encoding="utf-8") time.sleep(1.5) # dont be a goblin orders_df, items_df = flatten_orders(history, details) orders_df.to_csv(outdir / "orders.csv", index=False) items_df.to_csv(outdir / "items.csv", index=False) # nice optional local db import sqlite3 conn = sqlite3.connect(outdir / "giant.sqlite") orders_df.to_sql("orders", conn, if_exists="replace", index=False) items_df.to_sql("items", conn, if_exists="replace", index=False) conn.close() print(f"wrote {len(orders_df)} orders and {len(items_df)} items to {outdir}") if __name__ == "__main__": main()