343 lines
10 KiB
Python
343 lines
10 KiB
Python
import csv
|
|
import json
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
import browser_cookie3
|
|
from curl_cffi import requests
|
|
import click
|
|
|
|
|
|
BASE = "https://giantfood.com"
|
|
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
|
|
|
|
ORDER_FIELDS = [
|
|
"order_id",
|
|
"order_date",
|
|
"delivery_date",
|
|
"service_type",
|
|
"order_total",
|
|
"payment_method",
|
|
"total_item_count",
|
|
"total_savings",
|
|
"your_savings_total",
|
|
"coupons_discounts_total",
|
|
"store_name",
|
|
"store_number",
|
|
"store_address1",
|
|
"store_city",
|
|
"store_state",
|
|
"store_zipcode",
|
|
"refund_order",
|
|
"ebt_order",
|
|
]
|
|
|
|
ITEM_FIELDS = [
|
|
"order_id",
|
|
"order_date",
|
|
"line_no",
|
|
"pod_id",
|
|
"item_name",
|
|
"upc",
|
|
"category_id",
|
|
"category",
|
|
"qty",
|
|
"unit",
|
|
"unit_price",
|
|
"line_total",
|
|
"picked_weight",
|
|
"mvp_savings",
|
|
"reward_savings",
|
|
"coupon_savings",
|
|
"coupon_price",
|
|
]
|
|
|
|
|
|
def load_config():
|
|
if load_dotenv is not None:
|
|
load_dotenv()
|
|
|
|
return {
|
|
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
|
|
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
|
|
}
|
|
|
|
|
|
def build_session():
|
|
session = requests.Session()
|
|
session.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com"))
|
|
session.headers.update(
|
|
{
|
|
"user-agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
|
|
"Gecko/20100101 Firefox/148.0"
|
|
),
|
|
"accept": "application/json, text/plain, */*",
|
|
"accept-language": "en-US,en;q=0.9",
|
|
"referer": ACCOUNT_PAGE,
|
|
}
|
|
)
|
|
return session
|
|
|
|
|
|
def safe_get(session, url, **kwargs):
|
|
last_response = None
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
response = session.get(
|
|
url,
|
|
impersonate="firefox",
|
|
timeout=30,
|
|
**kwargs,
|
|
)
|
|
last_response = response
|
|
|
|
if response.status_code == 200:
|
|
return response
|
|
|
|
click.echo(f"retry {attempt + 1}/3 status={response.status_code}")
|
|
except Exception as exc: # pragma: no cover - network error path
|
|
click.echo(f"retry {attempt + 1}/3 error={exc}")
|
|
|
|
time.sleep(3)
|
|
|
|
if last_response is not None:
|
|
last_response.raise_for_status()
|
|
|
|
raise RuntimeError(f"failed to fetch {url}")
|
|
|
|
|
|
def get_history(session, user_id, loyalty):
|
|
response = safe_get(
|
|
session,
|
|
f"{BASE}/api/v6.0/user/{user_id}/order/history",
|
|
params={"filter": "instore", "loyaltyNumber": loyalty},
|
|
)
|
|
return response.json()
|
|
|
|
|
|
def get_order_detail(session, user_id, order_id):
|
|
response = safe_get(
|
|
session,
|
|
f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}",
|
|
params={"isInStore": "true"},
|
|
)
|
|
return response.json()
|
|
|
|
|
|
def flatten_orders(history, details):
|
|
orders = []
|
|
items = []
|
|
history_lookup = {record["orderId"]: record for record in history.get("records", [])}
|
|
|
|
for detail in details:
|
|
order_id = str(detail["orderId"])
|
|
history_row = history_lookup.get(detail["orderId"], {})
|
|
pickup = detail.get("pup", {})
|
|
|
|
orders.append(
|
|
{
|
|
"order_id": order_id,
|
|
"order_date": detail.get("orderDate"),
|
|
"delivery_date": detail.get("deliveryDate"),
|
|
"service_type": history_row.get("serviceType"),
|
|
"order_total": detail.get("orderTotal"),
|
|
"payment_method": detail.get("paymentMethod"),
|
|
"total_item_count": detail.get("totalItemCount"),
|
|
"total_savings": detail.get("totalSavings"),
|
|
"your_savings_total": detail.get("yourSavingsTotal"),
|
|
"coupons_discounts_total": detail.get("couponsDiscountsTotal"),
|
|
"store_name": pickup.get("storeName"),
|
|
"store_number": pickup.get("aholdStoreNumber"),
|
|
"store_address1": pickup.get("storeAddress1"),
|
|
"store_city": pickup.get("storeCity"),
|
|
"store_state": pickup.get("storeState"),
|
|
"store_zipcode": pickup.get("storeZipcode"),
|
|
"refund_order": detail.get("refundOrder"),
|
|
"ebt_order": detail.get("ebtOrder"),
|
|
}
|
|
)
|
|
|
|
for line_no, item in enumerate(detail.get("items", []), start=1):
|
|
items.append(
|
|
{
|
|
"order_id": order_id,
|
|
"order_date": detail.get("orderDate"),
|
|
"line_no": str(line_no),
|
|
"pod_id": item.get("podId"),
|
|
"item_name": item.get("itemName"),
|
|
"upc": item.get("primUpcCd"),
|
|
"category_id": item.get("categoryId"),
|
|
"category": item.get("categoryDesc"),
|
|
"qty": item.get("shipQy"),
|
|
"unit": item.get("lbEachCd"),
|
|
"unit_price": item.get("unitPrice"),
|
|
"line_total": item.get("groceryAmount"),
|
|
"picked_weight": item.get("totalPickedWeight"),
|
|
"mvp_savings": item.get("mvpSavings"),
|
|
"reward_savings": item.get("rewardSavings"),
|
|
"coupon_savings": item.get("couponSavings"),
|
|
"coupon_price": item.get("couponPrice"),
|
|
}
|
|
)
|
|
|
|
return orders, items
|
|
|
|
|
|
def normalize_row(row, fieldnames):
|
|
return {field: stringify(row.get(field)) for field in fieldnames}
|
|
|
|
|
|
def stringify(value):
|
|
if value is None:
|
|
return ""
|
|
return str(value)
|
|
|
|
|
|
def read_csv_rows(path):
|
|
if not path.exists():
|
|
return [], []
|
|
|
|
with path.open(newline="", encoding="utf-8") as handle:
|
|
reader = csv.DictReader(handle)
|
|
fieldnames = reader.fieldnames or []
|
|
return fieldnames, list(reader)
|
|
|
|
|
|
def read_existing_order_ids(path):
|
|
_, rows = read_csv_rows(path)
|
|
return {row["order_id"] for row in rows if row.get("order_id")}
|
|
|
|
|
|
def merge_rows(existing_rows, new_rows, subset):
|
|
merged = []
|
|
row_index = {}
|
|
|
|
for row in existing_rows + new_rows:
|
|
key = tuple(stringify(row.get(field)) for field in subset)
|
|
normalized = dict(row)
|
|
if key in row_index:
|
|
merged[row_index[key]] = normalized
|
|
else:
|
|
row_index[key] = len(merged)
|
|
merged.append(normalized)
|
|
|
|
return merged
|
|
|
|
|
|
def append_dedup(path, new_rows, subset, fieldnames):
|
|
existing_fieldnames, existing_rows = read_csv_rows(path)
|
|
all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames))
|
|
|
|
merged = merge_rows(
|
|
[normalize_row(row, all_fieldnames) for row in existing_rows],
|
|
[normalize_row(row, all_fieldnames) for row in new_rows],
|
|
subset=subset,
|
|
)
|
|
|
|
with path.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=all_fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(merged)
|
|
|
|
return merged
|
|
|
|
|
|
def write_json(path, payload):
|
|
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
|
|
|
|
@click.command()
|
|
@click.option("--user-id", default=None, help="Giant user id.")
|
|
@click.option("--loyalty", default=None, help="Giant loyalty number.")
|
|
@click.option(
|
|
"--outdir",
|
|
default="giant_output",
|
|
show_default=True,
|
|
help="Directory for raw json and csv outputs.",
|
|
)
|
|
@click.option(
|
|
"--sleep-seconds",
|
|
default=1.5,
|
|
show_default=True,
|
|
type=float,
|
|
help="Delay between order detail requests.",
|
|
)
|
|
def main(user_id, loyalty, outdir, sleep_seconds):
|
|
config = load_config()
|
|
user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str)
|
|
loyalty = loyalty or config["loyalty"] or click.prompt(
|
|
"Giant loyalty number", type=str
|
|
)
|
|
|
|
outdir = Path(outdir)
|
|
rawdir = outdir / "raw"
|
|
rawdir.mkdir(parents=True, exist_ok=True)
|
|
|
|
orders_csv = outdir / "orders.csv"
|
|
items_csv = outdir / "items.csv"
|
|
|
|
click.echo("Using cookies from your current Firefox profile.")
|
|
click.echo(f"Open Giant here, confirm you're logged in, then return: {ACCOUNT_PAGE}")
|
|
click.pause(info="Press any key once Giant is open and logged in")
|
|
|
|
session = build_session()
|
|
|
|
click.echo("Fetching order history...")
|
|
history = get_history(session, user_id, loyalty)
|
|
write_json(rawdir / "history.json", history)
|
|
|
|
records = history.get("records", [])
|
|
click.echo(f"History returned {len(records)} visits.")
|
|
click.echo(
|
|
"Note: Giant appears to expose only the most recent 50 visits, "
|
|
"so run this periodically if you want full continuity."
|
|
)
|
|
|
|
history_order_ids = [str(record["orderId"]) for record in records]
|
|
existing_order_ids = read_existing_order_ids(orders_csv)
|
|
new_order_ids = [order_id for order_id in history_order_ids if order_id not in existing_order_ids]
|
|
|
|
click.echo(f"Existing orders in csv: {len(existing_order_ids)}")
|
|
click.echo(f"New orders to fetch: {len(new_order_ids)}")
|
|
|
|
if not new_order_ids:
|
|
click.echo("No new orders found. Done.")
|
|
return
|
|
|
|
details = []
|
|
for order_id in new_order_ids:
|
|
click.echo(f"Fetching {order_id}")
|
|
detail = get_order_detail(session, user_id, order_id)
|
|
details.append(detail)
|
|
write_json(rawdir / f"{order_id}.json", detail)
|
|
time.sleep(sleep_seconds)
|
|
|
|
click.echo("Flattening new data...")
|
|
orders, items = flatten_orders(history, details)
|
|
|
|
all_orders = append_dedup(
|
|
orders_csv,
|
|
orders,
|
|
subset=["order_id"],
|
|
fieldnames=ORDER_FIELDS,
|
|
)
|
|
all_items = append_dedup(
|
|
items_csv,
|
|
items,
|
|
subset=["order_id", "line_no", "item_name", "upc", "line_total"],
|
|
fieldnames=ITEM_FIELDS,
|
|
)
|
|
|
|
click.echo("Done.")
|
|
click.echo(f"Orders csv: {orders_csv}")
|
|
click.echo(f"Items csv: {items_csv}")
|
|
click.echo(f"Total orders stored: {len(all_orders)}")
|
|
click.echo(f"Total item rows stored: {len(all_items)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|