Files
scrape-giant/scrape_costco.py

711 lines
21 KiB
Python

import os
import csv
import json
import time
import re
from pathlib import Path
from calendar import monthrange
from datetime import datetime, timedelta
from dotenv import load_dotenv
import click
from curl_cffi import requests
from browser_session import (
find_firefox_profile_dir,
load_firefox_cookies,
read_firefox_local_storage,
read_firefox_webapps_store,
)
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco"
SUMMARY_QUERY = """
query receiptsWithCounts($startDate: String!, $endDate: String!, $documentType: String!, $documentSubType: String!) {
receiptsWithCounts(startDate: $startDate, endDate: $endDate, documentType: $documentType, documentSubType: $documentSubType) {
inWarehouse
gasStation
carWash
gasAndCarWash
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionBarcode
warehouseName
transactionType
total
totalItemCount
itemArray {
itemNumber
}
tenderArray {
tenderTypeCode
tenderDescription
amountTender
}
couponArray {
upcnumberCoupon
}
}
}
}
""".strip()
DETAIL_QUERY = """
query receiptsWithCounts($barcode: String!, $documentType: String!) {
receiptsWithCounts(barcode: $barcode, documentType: $documentType) {
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionDate
companyNumber
warehouseNumber
operatorNumber
warehouseShortName
registerNumber
transactionNumber
transactionType
transactionBarcode
total
warehouseAddress1
warehouseAddress2
warehouseCity
warehouseState
warehouseCountry
warehousePostalCode
totalItemCount
subTotal
taxes
total
invoiceNumber
sequenceNumber
itemArray {
itemNumber
itemDescription01
frenchItemDescription1
itemDescription02
frenchItemDescription2
itemIdentifier
itemDepartmentNumber
unit
amount
taxFlag
merchantID
entryMethod
transDepartmentNumber
fuelUnitQuantity
fuelGradeCode
itemUnitPriceAmount
fuelUomCode
fuelUomDescription
fuelUomDescriptionFr
fuelGradeDescription
fuelGradeDescriptionFr
}
tenderArray {
tenderTypeCode
tenderSubTypeCode
tenderDescription
amountTender
displayAccountNumber
sequenceNumber
approvalNumber
responseCode
tenderTypeName
transactionID
merchantID
entryMethod
tenderAcctTxnNumber
tenderAuthorizationCode
tenderTypeNameFr
tenderEntryMethodDescription
walletType
walletId
storedValueBucket
}
subTaxes {
tax1
tax2
tax3
tax4
aTaxPercent
aTaxLegend
aTaxAmount
aTaxPrintCode
aTaxPrintCodeFR
aTaxIdentifierCode
bTaxPercent
bTaxLegend
bTaxAmount
bTaxPrintCode
bTaxPrintCodeFR
bTaxIdentifierCode
cTaxPercent
cTaxLegend
cTaxAmount
cTaxIdentifierCode
dTaxPercent
dTaxLegend
dTaxAmount
dTaxPrintCode
dTaxPrintCodeFR
dTaxIdentifierCode
uTaxLegend
uTaxAmount
uTaxableAmount
}
instantSavings
membershipNumber
}
}
}
""".strip()
ORDER_FIELDS = [
"retailer",
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
"raw_history_path",
"raw_order_path",
]
ITEM_FIELDS = [
"retailer",
"order_id",
"line_no",
"order_date",
"retailer_item_id",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
"image_url",
"raw_order_path",
"is_discount_line",
"is_coupon_line",
]
COSTCO_STORAGE_ORIGIN = "costco.com"
COSTCO_ID_TOKEN_STORAGE_KEY = "idToken"
COSTCO_CLIENT_ID_STORAGE_KEY = "clientID"
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_X_WCS_CLIENTID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(auth_headers):
headers = {
"accept": "*/*",
"content-type": "application/json-patch+json",
"costco.service": "restOrders",
"costco.env": "ecom",
"origin": "https://www.costco.com",
"referer": "https://www.costco.com/",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
headers.update(auth_headers)
return headers
def load_costco_browser_headers(profile_dir, authorization, client_id, client_identifier):
local_storage = read_firefox_local_storage(profile_dir, COSTCO_STORAGE_ORIGIN)
webapps_store = read_firefox_webapps_store(profile_dir, COSTCO_STORAGE_ORIGIN)
auth_header = authorization.strip() if authorization else ""
if client_id:
client_id = client_id.strip()
if client_identifier:
client_identifier = client_identifier.strip()
if not auth_header:
id_token = (
local_storage.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_ID_TOKEN_STORAGE_KEY, "").strip()
)
if id_token:
auth_header = f"Bearer {id_token}"
client_id = client_id or (
local_storage.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
or webapps_store.get(COSTCO_CLIENT_ID_STORAGE_KEY, "").strip()
)
if not auth_header:
raise click.ClickException(
"could not find Costco auth token; set COSTCO_X_AUTHORIZATION or load Firefox idToken"
)
if not client_id or not client_identifier:
raise click.ClickException(
"missing Costco client ids; set COSTCO_X_WCS_CLIENTID and COSTCO_CLIENT_IDENTIFIER"
)
return {
"costco-x-authorization": auth_header,
"costco-x-wcs-clientId": client_id,
"client-identifier": client_identifier,
}
def build_session(profile_dir, auth_headers):
session = requests.Session()
session.cookies.update(load_firefox_cookies(".costco.com", profile_dir))
session.headers.update(build_headers(auth_headers))
session.headers.update(auth_headers)
return session
def graphql_post(session, query, variables):
last_response = None
for attempt in range(3):
try:
response = session.post(
BASE_URL,
json={"query": query, "variables": variables},
impersonate="firefox",
timeout=30,
)
last_response = response
if response.status_code == 200:
return response.json()
click.echo(f"retry {attempt + 1}/3 status={response.status_code} body={response.text[:500]}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError("failed to fetch Costco GraphQL payload")
def safe_filename(value):
return re.sub(r'[<>:"/\\|?*]+', "-", str(value))
def summary_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def detail_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def summary_counts(payload):
counts = payload.get("data", {}).get("receiptsWithCounts", {})
return {
"inWarehouse": counts.get("inWarehouse", 0) or 0,
"gasStation": counts.get("gasStation", 0) or 0,
"carWash": counts.get("carWash", 0) or 0,
"gasAndCarWash": counts.get("gasAndCarWash", 0) or 0,
}
def parse_cli_date(value):
return datetime.strptime(value, "%m/%d/%Y").date()
def format_cli_date(value):
return f"{value.month}/{value.day:02d}/{value.year}"
def subtract_months(value, months):
year = value.year
month = value.month - months
while month <= 0:
month += 12
year -= 1
day = min(value.day, monthrange(year, month)[1])
return value.replace(year=year, month=month, day=day)
def resolve_date_range(months_back, today=None):
if months_back < 1:
raise click.ClickException("months-back must be at least 1")
end = today or datetime.now().date()
start = subtract_months(end, months_back)
return format_cli_date(start), format_cli_date(end)
def build_date_windows(start_date, end_date, window_days):
start = parse_cli_date(start_date)
end = parse_cli_date(end_date)
if end < start:
raise click.ClickException("end-date must be on or after start-date")
if window_days < 1:
raise click.ClickException("window-days must be at least 1")
windows = []
current = start
while current <= end:
window_end = min(current + timedelta(days=window_days - 1), end)
windows.append(
{
"startDate": format_cli_date(current),
"endDate": format_cli_date(window_end),
}
)
current = window_end + timedelta(days=1)
return windows
def unique_receipts(receipts):
by_barcode = {}
for receipt in receipts:
key = receipt_key(receipt)
if key:
by_barcode[key] = receipt
return list(by_barcode.values())
def receipt_key(receipt):
barcode = receipt.get("transactionBarcode", "")
transaction_date_time = receipt.get("transactionDateTime", "")
if not barcode:
return ""
return f"{barcode}::{transaction_date_time}"
def fetch_summary_windows(
session,
start_date,
end_date,
document_type,
document_sub_type,
window_days,
):
requests_metadata = []
combined_receipts = []
for window in build_date_windows(start_date, end_date, window_days):
variables = {
"startDate": window["startDate"],
"endDate": window["endDate"],
"text": "custom",
"documentType": document_type,
"documentSubType": document_sub_type,
}
payload = graphql_post(session, SUMMARY_QUERY, variables)
receipts = summary_receipts(payload)
counts = summary_counts(payload)
warehouse_count = sum(
1 for receipt in receipts if receipt.get("receiptType") == "In-Warehouse"
)
mismatch = counts["inWarehouse"] != warehouse_count
requests_metadata.append(
{
**variables,
"returnedReceipts": len(receipts),
"returnedInWarehouseReceipts": warehouse_count,
"inWarehouse": counts["inWarehouse"],
"gasStation": counts["gasStation"],
"carWash": counts["carWash"],
"gasAndCarWash": counts["gasAndCarWash"],
"countMismatch": mismatch,
}
)
if mismatch:
click.echo(
(
"warning: summary count mismatch for "
f"{window['startDate']} to {window['endDate']}: "
f"inWarehouse={counts['inWarehouse']} "
f"returnedInWarehouseReceipts={warehouse_count}"
),
err=True,
)
combined_receipts.extend(receipts)
unique = unique_receipts(combined_receipts)
aggregate_payload = {
"data": {
"receiptsWithCounts": {
"inWarehouse": sum(row["inWarehouse"] for row in requests_metadata),
"gasStation": sum(row["gasStation"] for row in requests_metadata),
"carWash": sum(row["carWash"] for row in requests_metadata),
"gasAndCarWash": sum(row["gasAndCarWash"] for row in requests_metadata),
"receipts": unique,
}
}
}
return aggregate_payload, requests_metadata
def flatten_costco_data(summary_payload, detail_payloads, raw_dir):
summary_lookup = {
receipt_key(receipt): receipt
for receipt in summary_receipts(summary_payload)
if receipt_key(receipt)
}
orders = []
items = []
for detail_payload in detail_payloads:
for receipt in detail_receipts(detail_payload):
order_id = receipt["transactionBarcode"]
receipt_id = receipt_key(receipt)
summary_row = summary_lookup.get(receipt_id, {})
coupon_numbers = {
row.get("upcnumberCoupon", "")
for row in summary_row.get("couponArray", []) or []
if row.get("upcnumberCoupon")
}
raw_order_path = raw_dir / f"{safe_filename(receipt_id or order_id)}.json"
orders.append(
{
"retailer": RETAILER,
"order_id": order_id,
"order_date": receipt.get("transactionDate", ""),
"delivery_date": receipt.get("transactionDate", ""),
"service_type": receipt.get("receiptType", ""),
"order_total": stringify(receipt.get("total")),
"payment_method": compact_join(
summary_row.get("tenderArray", []) or [], "tenderDescription"
),
"total_item_count": stringify(receipt.get("totalItemCount")),
"total_savings": stringify(receipt.get("instantSavings")),
"your_savings_total": stringify(receipt.get("instantSavings")),
"coupons_discounts_total": stringify(receipt.get("instantSavings")),
"store_name": receipt.get("warehouseName", ""),
"store_number": stringify(receipt.get("warehouseNumber")),
"store_address1": receipt.get("warehouseAddress1", ""),
"store_city": receipt.get("warehouseCity", ""),
"store_state": receipt.get("warehouseState", ""),
"store_zipcode": receipt.get("warehousePostalCode", ""),
"refund_order": "false",
"ebt_order": "false",
"raw_history_path": (raw_dir / "summary.json").as_posix(),
"raw_order_path": raw_order_path.as_posix(),
}
)
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
item_number = stringify(item.get("itemNumber"))
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
is_discount = is_discount_line(item)
is_coupon = is_discount and (
item_number in coupon_numbers
or description.startswith("/")
)
items.append(
{
"retailer": RETAILER,
"order_id": order_id,
"line_no": str(line_no),
"order_date": receipt.get("transactionDate", ""),
"retailer_item_id": item_number,
"pod_id": "",
"item_name": description,
"upc": "",
"category_id": stringify(item.get("itemDepartmentNumber")),
"category": stringify(item.get("transDepartmentNumber")),
"qty": stringify(item.get("unit")),
"unit": stringify(item.get("itemIdentifier")),
"unit_price": stringify(item.get("itemUnitPriceAmount")),
"line_total": stringify(item.get("amount")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": stringify(item.get("amount") if is_coupon else ""),
"coupon_price": "",
"image_url": "",
"raw_order_path": raw_order_path.as_posix(),
"is_discount_line": "true" if is_discount else "false",
"is_coupon_line": "true" if is_coupon else "false",
}
)
return orders, items
def join_descriptions(*parts):
return " ".join(str(part).strip() for part in parts if part).strip()
def compact_join(rows, field):
values = [str(row.get(field, "")).strip() for row in rows if row.get(field)]
return " | ".join(values)
def is_discount_line(item):
amount = item.get("amount")
unit = item.get("unit")
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
try:
amount_val = float(amount)
except (TypeError, ValueError):
amount_val = 0.0
try:
unit_val = float(unit)
except (TypeError, ValueError):
unit_val = 0.0
return amount_val < 0 or unit_val < 0 or description.startswith("/")
def stringify(value):
if value is None:
return ""
return str(value)
def write_json(path, payload):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_csv(path, rows, fieldnames):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option(
"--outdir",
default="costco_output",
show_default=True,
help="Output directory for Costco raw and flattened files.",
)
@click.option(
"--document-type",
default="all",
show_default=True,
help="Summary document type.",
)
@click.option(
"--document-sub-type",
default="all",
show_default=True,
help="Summary document sub type.",
)
@click.option(
"--window-days",
default=92,
show_default=True,
type=int,
help="Maximum number of days to request per summary window.",
)
@click.option(
"--months-back",
default=36,
show_default=True,
type=int,
help="How many months of receipts to enumerate back from today.",
)
@click.option(
"--firefox-profile-dir",
default=None,
help="Firefox profile directory to use for cookies and session storage.",
)
def main(
outdir,
document_type,
document_sub_type,
window_days,
months_back,
firefox_profile_dir,
):
outdir = Path(outdir)
raw_dir = outdir / "raw"
config = load_config()
profile_dir = Path(firefox_profile_dir) if firefox_profile_dir else None
if profile_dir is None:
try:
profile_dir = find_firefox_profile_dir()
except Exception:
profile_dir = click.prompt(
"Firefox profile dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
)
auth_headers = load_costco_browser_headers(
profile_dir,
authorization=config["authorization"],
client_id=config["client_id"],
client_identifier=config["client_identifier"],
)
session = build_session(profile_dir, auth_headers)
start_date, end_date = resolve_date_range(months_back)
summary_payload, request_metadata = fetch_summary_windows(
session,
start_date,
end_date,
document_type,
document_sub_type,
window_days,
)
write_json(raw_dir / "summary.json", summary_payload)
write_json(raw_dir / "summary_requests.json", request_metadata)
receipts = summary_receipts(summary_payload)
detail_payloads = []
for receipt in receipts:
barcode = receipt["transactionBarcode"]
receipt_id = receipt_key(receipt) or barcode
click.echo(f"fetching {barcode}")
detail_payload = graphql_post(
session,
DETAIL_QUERY,
{"barcode": barcode, "documentType": "warehouse"},
)
detail_payloads.append(detail_payload)
write_json(raw_dir / f"{safe_filename(receipt_id)}.json", detail_payload)
orders, items = flatten_costco_data(summary_payload, detail_payloads, raw_dir)
write_csv(outdir / "orders.csv", orders, ORDER_FIELDS)
write_csv(outdir / "items.csv", items, ITEM_FIELDS)
click.echo(f"wrote {len(orders)} orders and {len(items)} item rows to {outdir}")
if __name__ == "__main__":
main()