import csv import json import os import time from pathlib import Path import click from dotenv import load_dotenv from curl_cffi import requests from browser_session import find_firefox_profile_dir, load_firefox_cookies BASE = "https://giantfood.com" ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" RETAILER = "giant" ORDER_FIELDS = [ "retailer", "order_id", "order_date", "delivery_date", "service_type", "order_total", "payment_method", "total_item_count", "total_savings", "your_savings_total", "coupons_discounts_total", "store_name", "store_number", "store_address1", "store_city", "store_state", "store_zipcode", "refund_order", "ebt_order", "raw_history_path", "raw_order_path", ] ITEM_FIELDS = [ "retailer", "order_id", "order_date", "line_no", "retailer_item_id", "pod_id", "item_name", "upc", "category_id", "category", "qty", "unit", "unit_price", "line_total", "picked_weight", "mvp_savings", "reward_savings", "coupon_savings", "coupon_price", "image_url", "raw_order_path", "is_discount_line", "is_coupon_line", ] def load_config(): if load_dotenv is not None: load_dotenv() return { "user_id": os.getenv("GIANT_USER_ID", "").strip(), "loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(), } def build_session(): profile_dir = find_firefox_profile_dir() session = requests.Session() session.cookies.update(load_firefox_cookies("giantfood.com", profile_dir)) session.headers.update( { "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " "Gecko/20100101 Firefox/148.0" ), "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9", "referer": ACCOUNT_PAGE, } ) return session def safe_get(session, url, **kwargs): last_response = None for attempt in range(3): try: response = session.get( url, impersonate="firefox", timeout=30, **kwargs, ) last_response = response if response.status_code == 200: return response click.echo(f"retry {attempt + 1}/3 status={response.status_code}") except Exception as exc: # pragma: no cover - network error path click.echo(f"retry {attempt + 1}/3 error={exc}") time.sleep(3) if last_response is not None: last_response.raise_for_status() raise RuntimeError(f"failed to fetch {url}") def get_history(session, user_id, loyalty): response = safe_get( session, f"{BASE}/api/v6.0/user/{user_id}/order/history", params={"filter": "instore", "loyaltyNumber": loyalty}, ) return response.json() def get_order_detail(session, user_id, order_id): response = safe_get( session, f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}", params={"isInStore": "true"}, ) return response.json() def flatten_orders(history, details, history_path=None, raw_dir=None): orders = [] items = [] history_lookup = {record["orderId"]: record for record in history.get("records", [])} history_path_value = history_path.as_posix() if history_path else "" for detail in details: order_id = str(detail["orderId"]) history_row = history_lookup.get(detail["orderId"], {}) pickup = detail.get("pup", {}) raw_order_path = (raw_dir / f"{order_id}.json").as_posix() if raw_dir else "" orders.append( { "retailer": RETAILER, "order_id": order_id, "order_date": detail.get("orderDate"), "delivery_date": detail.get("deliveryDate"), "service_type": history_row.get("serviceType"), "order_total": detail.get("orderTotal"), "payment_method": detail.get("paymentMethod"), "total_item_count": detail.get("totalItemCount"), "total_savings": detail.get("totalSavings"), "your_savings_total": detail.get("yourSavingsTotal"), "coupons_discounts_total": detail.get("couponsDiscountsTotal"), "store_name": pickup.get("storeName"), "store_number": pickup.get("aholdStoreNumber"), "store_address1": pickup.get("storeAddress1"), "store_city": pickup.get("storeCity"), "store_state": pickup.get("storeState"), "store_zipcode": pickup.get("storeZipcode"), "refund_order": detail.get("refundOrder"), "ebt_order": detail.get("ebtOrder"), "raw_history_path": history_path_value, "raw_order_path": raw_order_path, } ) for line_no, item in enumerate(detail.get("items", []), start=1): items.append( { "retailer": RETAILER, "order_id": order_id, "order_date": detail.get("orderDate"), "line_no": str(line_no), "retailer_item_id": "", "pod_id": item.get("podId"), "item_name": item.get("itemName"), "upc": item.get("primUpcCd"), "category_id": item.get("categoryId"), "category": item.get("categoryDesc"), "qty": item.get("shipQy"), "unit": item.get("lbEachCd"), "unit_price": item.get("unitPrice"), "line_total": item.get("groceryAmount"), "picked_weight": item.get("totalPickedWeight"), "mvp_savings": item.get("mvpSavings"), "reward_savings": item.get("rewardSavings"), "coupon_savings": item.get("couponSavings"), "coupon_price": item.get("couponPrice"), "image_url": "", "raw_order_path": raw_order_path, "is_discount_line": "false", "is_coupon_line": "false", } ) return orders, items def normalize_row(row, fieldnames): return {field: stringify(row.get(field)) for field in fieldnames} def stringify(value): if value is None: return "" return str(value) def read_csv_rows(path): if not path.exists(): return [], [] with path.open(newline="", encoding="utf-8") as handle: reader = csv.DictReader(handle) fieldnames = reader.fieldnames or [] return fieldnames, list(reader) def read_existing_order_ids(path): _, rows = read_csv_rows(path) return {row["order_id"] for row in rows if row.get("order_id")} def merge_rows(existing_rows, new_rows, subset): merged = [] row_index = {} for row in existing_rows + new_rows: key = tuple(stringify(row.get(field)) for field in subset) normalized = dict(row) if key in row_index: merged[row_index[key]] = normalized else: row_index[key] = len(merged) merged.append(normalized) return merged def append_dedup(path, new_rows, subset, fieldnames): existing_fieldnames, existing_rows = read_csv_rows(path) all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames)) merged = merge_rows( [normalize_row(row, all_fieldnames) for row in existing_rows], [normalize_row(row, all_fieldnames) for row in new_rows], subset=subset, ) with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=all_fieldnames) writer.writeheader() writer.writerows(merged) return merged def write_json(path, payload): path.write_text(json.dumps(payload, indent=2), encoding="utf-8") @click.command() @click.option("--user-id", default=None, help="Giant user id.") @click.option("--loyalty", default=None, help="Giant loyalty number.") @click.option( "--outdir", default="giant_output", show_default=True, help="Directory for raw json and csv outputs.", ) @click.option( "--sleep-seconds", default=1.5, show_default=True, type=float, help="Delay between order detail requests.", ) def main(user_id, loyalty, outdir, sleep_seconds): click.echo("legacy entrypoint: prefer collect_giant_web.py for data-model outputs") run_collection(user_id, loyalty, outdir, sleep_seconds) def run_collection( user_id, loyalty, outdir, sleep_seconds, orders_filename="orders.csv", items_filename="items.csv", ): config = load_config() user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str) loyalty = loyalty or config["loyalty"] or click.prompt( "Giant loyalty number", type=str ) outdir = Path(outdir) rawdir = outdir / "raw" rawdir.mkdir(parents=True, exist_ok=True) orders_csv = outdir / orders_filename items_csv = outdir / items_filename existing_order_ids = read_existing_order_ids(orders_csv) session = build_session() history = get_history(session, user_id, loyalty) history_path = rawdir / "history.json" write_json(history_path, history) records = history.get("records", []) click.echo(f"history returned {len(records)} visits; Giant exposes only the most recent 50") unseen_records = [ record for record in records if stringify(record.get("orderId")) not in existing_order_ids ] click.echo( f"found {len(unseen_records)} unseen visits " f"({len(existing_order_ids)} already stored)" ) details = [] for index, record in enumerate(unseen_records, start=1): order_id = stringify(record.get("orderId")) click.echo(f"[{index}/{len(unseen_records)}] fetching {order_id}") detail = get_order_detail(session, user_id, order_id) write_json(rawdir / f"{order_id}.json", detail) details.append(detail) if index < len(unseen_records): time.sleep(sleep_seconds) orders, items = flatten_orders(history, details, history_path=history_path, raw_dir=rawdir) merged_orders = append_dedup( orders_csv, orders, subset=["order_id"], fieldnames=ORDER_FIELDS, ) merged_items = append_dedup( items_csv, items, subset=["order_id", "line_no"], fieldnames=ITEM_FIELDS, ) click.echo( f"wrote {len(orders)} new orders / {len(items)} new items " f"({len(merged_orders)} total orders, {len(merged_items)} total items)" ) if __name__ == "__main__": main()