Files
scrape-giant/scrape_costco.py

465 lines
13 KiB
Python

import csv
import json
import os
from pathlib import Path
import click
from dotenv import load_dotenv
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco"
SUMMARY_QUERY = """
query receiptsWithCounts($startDate: String!, $endDate: String!, $documentType: String!, $documentSubType: String!) {
receiptsWithCounts(startDate: $startDate, endDate: $endDate, documentType: $documentType, documentSubType: $documentSubType) {
inWarehouse
gasStation
carWash
gasAndCarWash
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionBarcode
warehouseName
transactionType
total
totalItemCount
itemArray {
itemNumber
}
tenderArray {
tenderTypeCode
tenderDescription
amountTender
}
couponArray {
upcnumberCoupon
}
}
}
}
""".strip()
DETAIL_QUERY = """
query receiptsWithCounts($barcode: String!, $documentType: String!) {
receiptsWithCounts(barcode: $barcode, documentType: $documentType) {
receipts {
warehouseName
receiptType
documentType
transactionDateTime
transactionDate
companyNumber
warehouseNumber
operatorNumber
warehouseShortName
registerNumber
transactionNumber
transactionType
transactionBarcode
total
warehouseAddress1
warehouseAddress2
warehouseCity
warehouseState
warehouseCountry
warehousePostalCode
totalItemCount
subTotal
taxes
total
invoiceNumber
sequenceNumber
itemArray {
itemNumber
itemDescription01
frenchItemDescription1
itemDescription02
frenchItemDescription2
itemIdentifier
itemDepartmentNumber
unit
amount
taxFlag
merchantID
entryMethod
transDepartmentNumber
fuelUnitQuantity
fuelGradeCode
itemUnitPriceAmount
fuelUomCode
fuelUomDescription
fuelUomDescriptionFr
fuelGradeDescription
fuelGradeDescriptionFr
}
tenderArray {
tenderTypeCode
tenderSubTypeCode
tenderDescription
amountTender
displayAccountNumber
sequenceNumber
approvalNumber
responseCode
tenderTypeName
transactionID
merchantID
entryMethod
tenderAcctTxnNumber
tenderAuthorizationCode
tenderTypeNameFr
tenderEntryMethodDescription
walletType
walletId
storedValueBucket
}
subTaxes {
tax1
tax2
tax3
tax4
aTaxPercent
aTaxLegend
aTaxAmount
aTaxPrintCode
aTaxPrintCodeFR
aTaxIdentifierCode
bTaxPercent
bTaxLegend
bTaxAmount
bTaxPrintCode
bTaxPrintCodeFR
bTaxIdentifierCode
cTaxPercent
cTaxLegend
cTaxAmount
cTaxIdentifierCode
dTaxPercent
dTaxLegend
dTaxAmount
dTaxPrintCode
dTaxPrintCodeFR
dTaxIdentifierCode
uTaxLegend
uTaxAmount
uTaxableAmount
}
instantSavings
membershipNumber
}
}
}
""".strip()
ORDER_FIELDS = [
"retailer",
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
"raw_history_path",
"raw_order_path",
]
ITEM_FIELDS = [
"retailer",
"order_id",
"line_no",
"order_date",
"retailer_item_id",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
"image_url",
"raw_order_path",
"is_discount_line",
"is_coupon_line",
]
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_WCS_CLIENT_ID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(config):
return {
"accept": "*/*",
"content-type": "application/json-patch+json",
"costco.service": "restOrders",
"costco.env": "ecom",
"costco-x-authorization": config["authorization"],
"costco-x-wcs-clientId": config["client_id"],
"client-identifier": config["client_identifier"],
"origin": "https://www.costco.com",
"referer": "https://www.costco.com/",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
def build_session(config):
from curl_cffi import requests
session = requests.Session()
session.headers.update(build_headers(config))
return session
def graphql_post(session, query, variables):
response = session.post(
BASE_URL,
json={"query": query, "variables": variables},
impersonate="firefox",
timeout=30,
)
response.raise_for_status()
return response.json()
def summary_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def detail_receipts(payload):
return payload.get("data", {}).get("receiptsWithCounts", {}).get("receipts", [])
def flatten_costco_data(summary_payload, detail_payloads, raw_dir):
summary_lookup = {
receipt["transactionBarcode"]: receipt
for receipt in summary_receipts(summary_payload)
}
orders = []
items = []
for detail_payload in detail_payloads:
for receipt in detail_receipts(detail_payload):
order_id = receipt["transactionBarcode"]
summary_row = summary_lookup.get(order_id, {})
coupon_numbers = {
row.get("upcnumberCoupon", "")
for row in summary_row.get("couponArray", []) or []
if row.get("upcnumberCoupon")
}
raw_order_path = raw_dir / f"{order_id}.json"
orders.append(
{
"retailer": RETAILER,
"order_id": order_id,
"order_date": receipt.get("transactionDate", ""),
"delivery_date": receipt.get("transactionDate", ""),
"service_type": receipt.get("receiptType", ""),
"order_total": stringify(receipt.get("total")),
"payment_method": compact_join(
summary_row.get("tenderArray", []) or [], "tenderDescription"
),
"total_item_count": stringify(receipt.get("totalItemCount")),
"total_savings": stringify(receipt.get("instantSavings")),
"your_savings_total": stringify(receipt.get("instantSavings")),
"coupons_discounts_total": stringify(receipt.get("instantSavings")),
"store_name": receipt.get("warehouseName", ""),
"store_number": stringify(receipt.get("warehouseNumber")),
"store_address1": receipt.get("warehouseAddress1", ""),
"store_city": receipt.get("warehouseCity", ""),
"store_state": receipt.get("warehouseState", ""),
"store_zipcode": receipt.get("warehousePostalCode", ""),
"refund_order": "false",
"ebt_order": "false",
"raw_history_path": (raw_dir / "summary.json").as_posix(),
"raw_order_path": raw_order_path.as_posix(),
}
)
for line_no, item in enumerate(receipt.get("itemArray", []), start=1):
item_number = stringify(item.get("itemNumber"))
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
is_discount = is_discount_line(item)
is_coupon = is_discount and (
item_number in coupon_numbers
or description.startswith("/")
)
items.append(
{
"retailer": RETAILER,
"order_id": order_id,
"line_no": str(line_no),
"order_date": receipt.get("transactionDate", ""),
"retailer_item_id": item_number,
"pod_id": "",
"item_name": description,
"upc": "",
"category_id": stringify(item.get("itemDepartmentNumber")),
"category": stringify(item.get("transDepartmentNumber")),
"qty": stringify(item.get("unit")),
"unit": stringify(item.get("itemIdentifier")),
"unit_price": stringify(item.get("itemUnitPriceAmount")),
"line_total": stringify(item.get("amount")),
"picked_weight": "",
"mvp_savings": "",
"reward_savings": "",
"coupon_savings": stringify(item.get("amount") if is_coupon else ""),
"coupon_price": "",
"image_url": "",
"raw_order_path": raw_order_path.as_posix(),
"is_discount_line": "true" if is_discount else "false",
"is_coupon_line": "true" if is_coupon else "false",
}
)
return orders, items
def join_descriptions(*parts):
return " ".join(str(part).strip() for part in parts if part).strip()
def compact_join(rows, field):
values = [str(row.get(field, "")).strip() for row in rows if row.get(field)]
return " | ".join(values)
def is_discount_line(item):
amount = item.get("amount")
unit = item.get("unit")
description = join_descriptions(
item.get("itemDescription01"), item.get("itemDescription02")
)
try:
amount_val = float(amount)
except (TypeError, ValueError):
amount_val = 0.0
try:
unit_val = float(unit)
except (TypeError, ValueError):
unit_val = 0.0
return amount_val < 0 or unit_val < 0 or description.startswith("/")
def stringify(value):
if value is None:
return ""
return str(value)
def write_json(path, payload):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_csv(path, rows, fieldnames):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
@click.command()
@click.option("--start-date", required=True, help="Start date like 1/01/2026.")
@click.option("--end-date", required=True, help="End date like 3/31/2026.")
@click.option(
"--outdir",
default="costco_output",
show_default=True,
help="Output directory for Costco raw and flattened files.",
)
@click.option(
"--document-type",
default="all",
show_default=True,
help="Summary document type.",
)
@click.option(
"--document-sub-type",
default="all",
show_default=True,
help="Summary document sub type.",
)
def main(start_date, end_date, outdir, document_type, document_sub_type):
config = load_config()
required = ["authorization", "client_id", "client_identifier"]
missing = [key for key in required if not config[key]]
if missing:
raise click.ClickException(
f"missing Costco auth config: {', '.join(missing)}"
)
outdir = Path(outdir)
raw_dir = outdir / "raw"
session = build_session(config)
summary_payload = graphql_post(
session,
SUMMARY_QUERY,
{
"startDate": start_date,
"endDate": end_date,
"text": "custom",
"documentType": document_type,
"documentSubType": document_sub_type,
},
)
write_json(raw_dir / "summary.json", summary_payload)
receipts = summary_receipts(summary_payload)
detail_payloads = []
for receipt in receipts:
barcode = receipt["transactionBarcode"]
click.echo(f"fetching {barcode}")
detail_payload = graphql_post(
session,
DETAIL_QUERY,
{"barcode": barcode, "documentType": "warehouse"},
)
detail_payloads.append(detail_payload)
write_json(raw_dir / f"{barcode}.json", detail_payload)
orders, items = flatten_costco_data(summary_payload, detail_payloads, raw_dir)
write_csv(outdir / "orders.csv", orders, ORDER_FIELDS)
write_csv(outdir / "items.csv", items, ITEM_FIELDS)
click.echo(f"wrote {len(orders)} orders and {len(items)} item rows to {outdir}")
if __name__ == "__main__":
main()