From d57b9cf52f2066eea5cf42d23828a98b154a480f Mon Sep 17 00:00:00 2001 From: ben Date: Sat, 14 Mar 2026 18:32:32 -0400 Subject: [PATCH] Harden giant receipt fetch CLI --- agents.md | 23 +++ requirements.txt | Bin 334 -> 149 bytes scrape-click.py | 251 +----------------------- scraper.py | 387 +++++++++++++++++++++++++++----------- tests/test_bc.py | 37 ++-- tests/test_bc_cffi.py | 36 ++-- tests/test_giant_login.py | 75 ++------ tests/test_scraper.py | 117 ++++++++++++ 8 files changed, 456 insertions(+), 470 deletions(-) create mode 100644 agents.md create mode 100644 tests/test_scraper.py diff --git a/agents.md b/agents.md new file mode 100644 index 0000000..6f3ac2d --- /dev/null +++ b/agents.md @@ -0,0 +1,23 @@ +# agent rules + +## priorities +- optimize for simplicity, boringness, and long-term maintainability +- prefer minimal diffs; avoid refactors unless required for the active task + +## tech stack +- python; pandas or polars +- file storage: json and csv, no sqlite or databases +- do not add new dependencies unless explicitly approved; if unavoidable, document justification in the active task notes + +## workflow +- prefer direct argv commands (no bash -lc / compound shell chains) unless necessary +- work on ONE task at a time unless explicitly instructed otherwise +- at the start of work, state the task id you are executing +- do not start work unless a task id is specified; if missing, choose the earliest unchecked task and say so +- propose incremental steps +- always include basic tests for core logic +- when you complete a task: + - mark it [x] in pm/tasks.md + - fill in evidence with commit hash + commands run + - never mark complete unless acceptance criteria are met + - include date and time (HH:MM) diff --git a/requirements.txt b/requirements.txt index 35a8f3f00d893eff071755c9941e07c5a19209fa..caa6c82e685a0ff9aad7ec8402e6b4ceb7ce490c 100644 GIT binary patch literal 149 zcmXAiK@I{j2u1h3iv(%M=z_BtMJB0JZG&U9o?fcU_aFR!w)%RgZKT#EYh*?sgl!PD zHP4xukaj>oQ;_+`fPg_tmW6o_aoj$w9RJo|lk2N$yK5h92gmYB%n1@qp>=5|^F`+9 Q#?g9P>!sfGJt7+E7v|n8#sB~S literal 334 zcmYk2O$x#=5QX1b@FlnKya|%MPCW z?=hk}sv~FSNeS+aw-$F(SfjOj74+$7lCp0PXK;=s1}%SyiB=KxLaQx)0~-4SQ&`ex X8{0F`^xHG(ksCBQxg_22+V^+^PAoDY diff --git a/scrape-click.py b/scrape-click.py index fdf34dd..577bd3e 100644 --- a/scrape-click.py +++ b/scrape-click.py @@ -1,253 +1,4 @@ -import json -import time -from pathlib import Path - -import browser_cookie3 -import click -import pandas as pd -from curl_cffi import requests -from dotenv import load_dotenv -import os - - -BASE = "https://giantfood.com" -ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" - - -def load_config(): - load_dotenv() - return { - "user_id": os.getenv("GIANT_USER_ID", "").strip(), - "loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(), - } - - -def build_session(): - s = requests.Session() - s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) - s.headers.update({ - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0", - "accept": "application/json, text/plain, */*", - "accept-language": "en-US,en;q=0.9", - "referer": ACCOUNT_PAGE, - }) - return s - - -def safe_get(session, url, **kwargs): - last_response = None - - for attempt in range(3): - try: - r = session.get( - url, - impersonate="firefox", - timeout=30, - **kwargs, - ) - last_response = r - - if r.status_code == 200: - return r - - click.echo(f"retry {attempt + 1}/3 status={r.status_code}") - except Exception as e: - click.echo(f"retry {attempt + 1}/3 error={e}") - - time.sleep(3) - - if last_response is not None: - last_response.raise_for_status() - - raise RuntimeError(f"failed to fetch {url}") - - -def get_history(session, user_id, loyalty): - url = f"{BASE}/api/v6.0/user/{user_id}/order/history" - r = safe_get( - session, - url, - params={ - "filter": "instore", - "loyaltyNumber": loyalty, - }, - ) - return r.json() - - -def get_order_detail(session, user_id, order_id): - url = f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}" - r = safe_get( - session, - url, - params={"isInStore": "true"}, - ) - return r.json() - - -def flatten_orders(history, details): - orders = [] - items = [] - - history_lookup = { - r["orderId"]: r - for r in history.get("records", []) - } - - for d in details: - hist = history_lookup.get(d["orderId"], {}) - pup = d.get("pup", {}) - - orders.append({ - "order_id": d["orderId"], - "order_date": d.get("orderDate"), - "delivery_date": d.get("deliveryDate"), - "service_type": hist.get("serviceType"), - "order_total": d.get("orderTotal"), - "payment_method": d.get("paymentMethod"), - "total_item_count": d.get("totalItemCount"), - "total_savings": d.get("totalSavings"), - "your_savings_total": d.get("yourSavingsTotal"), - "coupons_discounts_total": d.get("couponsDiscountsTotal"), - "store_name": pup.get("storeName"), - "store_number": pup.get("aholdStoreNumber"), - "store_address1": pup.get("storeAddress1"), - "store_city": pup.get("storeCity"), - "store_state": pup.get("storeState"), - "store_zipcode": pup.get("storeZipcode"), - "refund_order": d.get("refundOrder"), - "ebt_order": d.get("ebtOrder"), - }) - - for i, item in enumerate(d.get("items", []), start=1): - items.append({ - "order_id": d["orderId"], - "order_date": d.get("orderDate"), - "line_no": i, - "pod_id": item.get("podId"), - "item_name": item.get("itemName"), - "upc": item.get("primUpcCd"), - "category_id": item.get("categoryId"), - "category": item.get("categoryDesc"), - "qty": item.get("shipQy"), - "unit": item.get("lbEachCd"), - "unit_price": item.get("unitPrice"), - "line_total": item.get("groceryAmount"), - "picked_weight": item.get("totalPickedWeight"), - "mvp_savings": item.get("mvpSavings"), - "reward_savings": item.get("rewardSavings"), - "coupon_savings": item.get("couponSavings"), - "coupon_price": item.get("couponPrice"), - }) - - return pd.DataFrame(orders), pd.DataFrame(items) - - -def read_existing_order_ids(orders_csv: Path) -> set[str]: - if not orders_csv.exists(): - return set() - - try: - df = pd.read_csv(orders_csv, dtype={"order_id": str}) - if "order_id" not in df.columns: - return set() - return set(df["order_id"].dropna().astype(str)) - except Exception: - return set() - - -def append_dedup(existing_path: Path, new_df: pd.DataFrame, subset: list[str]) -> pd.DataFrame: - if existing_path.exists(): - old_df = pd.read_csv(existing_path, dtype=str) - combined = pd.concat([old_df, new_df.astype(str)], ignore_index=True) - else: - combined = new_df.astype(str).copy() - - combined = combined.drop_duplicates(subset=subset, keep="last") - combined.to_csv(existing_path, index=False) - return combined - - -@click.command() -@click.option("--user-id", default=None, help="giant user id") -@click.option("--loyalty", default=None, help="giant loyalty number") -@click.option("--outdir", default="giant_output", show_default=True, help="output directory") -@click.option("--sleep-seconds", default=1.5, show_default=True, type=float, help="delay between detail requests") -def main(user_id, loyalty, outdir, sleep_seconds): - cfg = load_config() - - user_id = user_id or cfg["user_id"] or click.prompt("giant user id", type=str) - loyalty = loyalty or cfg["loyalty"] or click.prompt("giant loyalty number", type=str) - - outdir = Path(outdir) - rawdir = outdir / "raw" - rawdir.mkdir(parents=True, exist_ok=True) - - orders_csv = outdir / "orders.csv" - items_csv = outdir / "items.csv" - - click.echo("using cookies from your current firefox profile.") - click.echo(f"open giant here, make sure you're logged in, then return: {ACCOUNT_PAGE}") - click.pause(info="press any key once giant is open and logged in") - - session = build_session() - - click.echo("fetching order history...") - history = get_history(session, user_id, loyalty) - - (rawdir / "history.json").write_text( - json.dumps(history, indent=2), - encoding="utf-8", - ) - - records = history.get("records", []) - click.echo(f"history returned {len(records)} visits") - click.echo("tip: giant appears to expose only the most recent 50 visits, so run this periodically if you want full continuity.") - - history_order_ids = [str(r["orderId"]) for r in records] - existing_order_ids = read_existing_order_ids(orders_csv) - new_order_ids = [oid for oid in history_order_ids if oid not in existing_order_ids] - - click.echo(f"existing orders in csv: {len(existing_order_ids)}") - click.echo(f"new orders to fetch: {len(new_order_ids)}") - - if not new_order_ids: - click.echo("no new orders found. done.") - return - - details = [] - for order_id in new_order_ids: - click.echo(f"fetching {order_id}") - d = get_order_detail(session, user_id, order_id) - details.append(d) - - (rawdir / f"{order_id}.json").write_text( - json.dumps(d, indent=2), - encoding="utf-8", - ) - - time.sleep(sleep_seconds) - - click.echo("flattening new data...") - orders_df, items_df = flatten_orders(history, details) - - orders_all = append_dedup( - orders_csv, - orders_df, - subset=["order_id"], - ) - - items_all = append_dedup( - items_csv, - items_df, - subset=["order_id", "line_no", "item_name", "upc", "line_total"], - ) - - click.echo("done") - click.echo(f"orders csv: {orders_csv}") - click.echo(f"items csv: {items_csv}") - click.echo(f"total orders stored: {len(orders_all)}") - click.echo(f"total item rows stored: {len(items_all)}") +from scraper import main if __name__ == "__main__": diff --git a/scraper.py b/scraper.py index fe7c57f..da588fd 100644 --- a/scraper.py +++ b/scraper.py @@ -1,29 +1,88 @@ +import csv import json +import os import time from pathlib import Path - import browser_cookie3 -import pandas as pd from curl_cffi import requests +import click BASE = "https://giantfood.com" ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" -USER_ID = "369513017" -LOYALTY = "440155630880" +ORDER_FIELDS = [ + "order_id", + "order_date", + "delivery_date", + "service_type", + "order_total", + "payment_method", + "total_item_count", + "total_savings", + "your_savings_total", + "coupons_discounts_total", + "store_name", + "store_number", + "store_address1", + "store_city", + "store_state", + "store_zipcode", + "refund_order", + "ebt_order", +] + +ITEM_FIELDS = [ + "order_id", + "order_date", + "line_no", + "pod_id", + "item_name", + "upc", + "category_id", + "category", + "qty", + "unit", + "unit_price", + "line_total", + "picked_weight", + "mvp_savings", + "reward_savings", + "coupon_savings", + "coupon_price", +] + + +def load_config(): + try: + from dotenv import load_dotenv + except ImportError: + load_dotenv = None + + if load_dotenv is not None: + load_dotenv() + + return { + "user_id": os.getenv("GIANT_USER_ID", "").strip(), + "loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(), + } def build_session(): - s = requests.Session() - s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) - s.headers.update({ - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0", - "accept": "application/json, text/plain, */*", - "accept-language": "en-US,en;q=0.9", - "referer": ACCOUNT_PAGE, - }) - return s + session = requests.Session() + session.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) + session.headers.update( + { + "user-agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " + "Gecko/20100101 Firefox/148.0" + ), + "accept": "application/json, text/plain, */*", + "accept-language": "en-US,en;q=0.9", + "referer": ACCOUNT_PAGE, + } + ) + return session def safe_get(session, url, **kwargs): @@ -31,20 +90,20 @@ def safe_get(session, url, **kwargs): for attempt in range(3): try: - r = session.get( + response = session.get( url, impersonate="firefox", timeout=30, **kwargs, ) - last_response = r + last_response = response - if r.status_code == 200: - return r + if response.status_code == 200: + return response - print(f"retry {attempt + 1}/3 status={r.status_code}") - except Exception as e: - print(f"retry {attempt + 1}/3 error={e}") + click.echo(f"retry {attempt + 1}/3 status={response.status_code}") + except Exception as exc: # pragma: no cover - network error path + click.echo(f"retry {attempt + 1}/3 error={exc}") time.sleep(3) @@ -54,128 +113,234 @@ def safe_get(session, url, **kwargs): raise RuntimeError(f"failed to fetch {url}") -def get_history(session): - url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history" - r = safe_get( +def get_history(session, user_id, loyalty): + response = safe_get( session, - url, - params={ - "filter": "instore", - "loyaltyNumber": LOYALTY, - }, + f"{BASE}/api/v6.0/user/{user_id}/order/history", + params={"filter": "instore", "loyaltyNumber": loyalty}, ) - return r.json() + return response.json() -def get_order_detail(session, order_id): - url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history/detail/{order_id}" - r = safe_get( +def get_order_detail(session, user_id, order_id): + response = safe_get( session, - url, + f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}", params={"isInStore": "true"}, ) - return r.json() + return response.json() def flatten_orders(history, details): orders = [] items = [] + history_lookup = {record["orderId"]: record for record in history.get("records", [])} - history_lookup = { - r["orderId"]: r - for r in history.get("records", []) - } + for detail in details: + order_id = str(detail["orderId"]) + history_row = history_lookup.get(detail["orderId"], {}) + pickup = detail.get("pup", {}) - for d in details: - hist = history_lookup.get(d["orderId"], {}) - pup = d.get("pup", {}) + orders.append( + { + "order_id": order_id, + "order_date": detail.get("orderDate"), + "delivery_date": detail.get("deliveryDate"), + "service_type": history_row.get("serviceType"), + "order_total": detail.get("orderTotal"), + "payment_method": detail.get("paymentMethod"), + "total_item_count": detail.get("totalItemCount"), + "total_savings": detail.get("totalSavings"), + "your_savings_total": detail.get("yourSavingsTotal"), + "coupons_discounts_total": detail.get("couponsDiscountsTotal"), + "store_name": pickup.get("storeName"), + "store_number": pickup.get("aholdStoreNumber"), + "store_address1": pickup.get("storeAddress1"), + "store_city": pickup.get("storeCity"), + "store_state": pickup.get("storeState"), + "store_zipcode": pickup.get("storeZipcode"), + "refund_order": detail.get("refundOrder"), + "ebt_order": detail.get("ebtOrder"), + } + ) - orders.append({ - "order_id": d["orderId"], - "order_date": d.get("orderDate"), - "delivery_date": d.get("deliveryDate"), - "service_type": hist.get("serviceType"), - "order_total": d.get("orderTotal"), - "payment_method": d.get("paymentMethod"), - "total_item_count": d.get("totalItemCount"), - "total_savings": d.get("totalSavings"), - "your_savings_total": d.get("yourSavingsTotal"), - "coupons_discounts_total": d.get("couponsDiscountsTotal"), - "store_name": pup.get("storeName"), - "store_number": pup.get("aholdStoreNumber"), - "store_address1": pup.get("storeAddress1"), - "store_city": pup.get("storeCity"), - "store_state": pup.get("storeState"), - "store_zipcode": pup.get("storeZipcode"), - "refund_order": d.get("refundOrder"), - "ebt_order": d.get("ebtOrder"), - }) + for line_no, item in enumerate(detail.get("items", []), start=1): + items.append( + { + "order_id": order_id, + "order_date": detail.get("orderDate"), + "line_no": str(line_no), + "pod_id": item.get("podId"), + "item_name": item.get("itemName"), + "upc": item.get("primUpcCd"), + "category_id": item.get("categoryId"), + "category": item.get("categoryDesc"), + "qty": item.get("shipQy"), + "unit": item.get("lbEachCd"), + "unit_price": item.get("unitPrice"), + "line_total": item.get("groceryAmount"), + "picked_weight": item.get("totalPickedWeight"), + "mvp_savings": item.get("mvpSavings"), + "reward_savings": item.get("rewardSavings"), + "coupon_savings": item.get("couponSavings"), + "coupon_price": item.get("couponPrice"), + } + ) - for i, item in enumerate(d.get("items", []), start=1): - items.append({ - "order_id": d["orderId"], - "order_date": d.get("orderDate"), - "line_no": i, - "pod_id": item.get("podId"), - "item_name": item.get("itemName"), - "upc": item.get("primUpcCd"), - "category_id": item.get("categoryId"), - "category": item.get("categoryDesc"), - "qty": item.get("shipQy"), - "unit": item.get("lbEachCd"), - "unit_price": item.get("unitPrice"), - "line_total": item.get("groceryAmount"), - "picked_weight": item.get("totalPickedWeight"), - "mvp_savings": item.get("mvpSavings"), - "reward_savings": item.get("rewardSavings"), - "coupon_savings": item.get("couponSavings"), - "coupon_price": item.get("couponPrice"), - }) - - return pd.DataFrame(orders), pd.DataFrame(items) + return orders, items -def main(): - outdir = Path("giant_output") +def normalize_row(row, fieldnames): + return {field: stringify(row.get(field)) for field in fieldnames} + + +def stringify(value): + if value is None: + return "" + return str(value) + + +def read_csv_rows(path): + if not path.exists(): + return [], [] + + with path.open(newline="", encoding="utf-8") as handle: + reader = csv.DictReader(handle) + fieldnames = reader.fieldnames or [] + return fieldnames, list(reader) + + +def read_existing_order_ids(path): + _, rows = read_csv_rows(path) + return {row["order_id"] for row in rows if row.get("order_id")} + + +def merge_rows(existing_rows, new_rows, subset): + merged = [] + row_index = {} + + for row in existing_rows + new_rows: + key = tuple(stringify(row.get(field)) for field in subset) + normalized = dict(row) + if key in row_index: + merged[row_index[key]] = normalized + else: + row_index[key] = len(merged) + merged.append(normalized) + + return merged + + +def append_dedup(path, new_rows, subset, fieldnames): + existing_fieldnames, existing_rows = read_csv_rows(path) + all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames)) + + merged = merge_rows( + [normalize_row(row, all_fieldnames) for row in existing_rows], + [normalize_row(row, all_fieldnames) for row in new_rows], + subset=subset, + ) + + with path.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=all_fieldnames) + writer.writeheader() + writer.writerows(merged) + + return merged + + +def write_json(path, payload): + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +@click.command() +@click.option("--user-id", default=None, help="Giant user id.") +@click.option("--loyalty", default=None, help="Giant loyalty number.") +@click.option( + "--outdir", + default="giant_output", + show_default=True, + help="Directory for raw json and csv outputs.", +) +@click.option( + "--sleep-seconds", + default=1.5, + show_default=True, + type=float, + help="Delay between order detail requests.", +) +def main(user_id, loyalty, outdir, sleep_seconds): + config = load_config() + user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str) + loyalty = loyalty or config["loyalty"] or click.prompt( + "Giant loyalty number", type=str + ) + + outdir = Path(outdir) rawdir = outdir / "raw" rawdir.mkdir(parents=True, exist_ok=True) + orders_csv = outdir / "orders.csv" + items_csv = outdir / "items.csv" + + click.echo("Using cookies from your current Firefox profile.") + click.echo(f"Open Giant here, confirm you're logged in, then return: {ACCOUNT_PAGE}") + click.pause(info="Press any key once Giant is open and logged in") + session = build_session() - print("fetching order history...") - history = get_history(session) + click.echo("Fetching order history...") + history = get_history(session, user_id, loyalty) + write_json(rawdir / "history.json", history) - (rawdir / "history.json").write_text( - json.dumps(history, indent=2), - encoding="utf-8", + records = history.get("records", []) + click.echo(f"History returned {len(records)} visits.") + click.echo( + "Note: Giant appears to expose only the most recent 50 visits, " + "so run this periodically if you want full continuity." ) - order_ids = [r["orderId"] for r in history.get("records", [])] - print(f"{len(order_ids)} orders found") + history_order_ids = [str(record["orderId"]) for record in records] + existing_order_ids = read_existing_order_ids(orders_csv) + new_order_ids = [order_id for order_id in history_order_ids if order_id not in existing_order_ids] + + click.echo(f"Existing orders in csv: {len(existing_order_ids)}") + click.echo(f"New orders to fetch: {len(new_order_ids)}") + + if not new_order_ids: + click.echo("No new orders found. Done.") + return details = [] - for order_id in order_ids: - print(f"fetching {order_id}") - d = get_order_detail(session, order_id) - details.append(d) + for order_id in new_order_ids: + click.echo(f"Fetching {order_id}") + detail = get_order_detail(session, user_id, order_id) + details.append(detail) + write_json(rawdir / f"{order_id}.json", detail) + time.sleep(sleep_seconds) - (rawdir / f"{order_id}.json").write_text( - json.dumps(d, indent=2), - encoding="utf-8", - ) + click.echo("Flattening new data...") + orders, items = flatten_orders(history, details) - time.sleep(1.5) + all_orders = append_dedup( + orders_csv, + orders, + subset=["order_id"], + fieldnames=ORDER_FIELDS, + ) + all_items = append_dedup( + items_csv, + items, + subset=["order_id", "line_no", "item_name", "upc", "line_total"], + fieldnames=ITEM_FIELDS, + ) - print("flattening data...") - orders_df, items_df = flatten_orders(history, details) - - orders_df.to_csv(outdir / "orders.csv", index=False) - items_df.to_csv(outdir / "items.csv", index=False) - - print("done") - print(f"{len(orders_df)} orders written to {outdir / 'orders.csv'}") - print(f"{len(items_df)} items written to {outdir / 'items.csv'}") + click.echo("Done.") + click.echo(f"Orders csv: {orders_csv}") + click.echo(f"Items csv: {items_csv}") + click.echo(f"Total orders stored: {len(all_orders)}") + click.echo(f"Total item rows stored: {len(all_items)}") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/tests/test_bc.py b/tests/test_bc.py index abd4a3e..21fc912 100644 --- a/tests/test_bc.py +++ b/tests/test_bc.py @@ -1,28 +1,17 @@ -import requests -import browser_cookie3 +import unittest -BASE = "https://giantfood.com" -ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" -USER_ID = "369513017" -LOYALTY = "440155630880" +try: + import browser_cookie3 # noqa: F401 + import requests # noqa: F401 +except ImportError as exc: # pragma: no cover - dependency-gated smoke test + browser_cookie3 = None + _IMPORT_ERROR = exc +else: + _IMPORT_ERROR = None -cj = browser_cookie3.firefox(domain_name="giantfood.com") -s = requests.Session() -s.cookies.update(cj) -s.headers.update({ - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0", - "accept": "application/json, text/plain, */*", - "accept-language": "en-US,en;q=0.9", - "referer": ACCOUNT_PAGE, -}) - -r = s.get( - f"{BASE}/api/v6.0/user/{USER_ID}/order/history", - params={"filter": "instore", "loyaltyNumber": LOYALTY}, - timeout=30, -) - -print(r.status_code) -print(r.text[:500]) +@unittest.skipIf(browser_cookie3 is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}") +class BrowserCookieSmokeTest(unittest.TestCase): + def test_dependencies_available(self): + self.assertIsNotNone(browser_cookie3) diff --git a/tests/test_bc_cffi.py b/tests/test_bc_cffi.py index cb92e06..9d7120c 100644 --- a/tests/test_bc_cffi.py +++ b/tests/test_bc_cffi.py @@ -1,27 +1,17 @@ -import browser_cookie3 -from curl_cffi import requests +import unittest -BASE = "https://giantfood.com" -ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" -USER_ID = "369513017" -LOYALTY = "440155630880" +try: + import browser_cookie3 # noqa: F401 + from curl_cffi import requests # noqa: F401 +except ImportError as exc: # pragma: no cover - dependency-gated smoke test + browser_cookie3 = None + _IMPORT_ERROR = exc +else: + _IMPORT_ERROR = None -s = requests.Session() -s.cookies.update(browser_cookie3.firefox(domain_name="giantfood.com")) -s.headers.update({ - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) Gecko/20100101 Firefox/148.0", - "accept": "application/json, text/plain, */*", - "accept-language": "en-US,en;q=0.9", - "referer": ACCOUNT_PAGE, -}) -r = s.get( - f"{BASE}/api/v6.0/user/{USER_ID}/order/history", - params={"filter": "instore", "loyaltyNumber": LOYALTY}, - impersonate="firefox", - timeout=30, -) - -print(r.status_code) -print(r.text[:500]) +@unittest.skipIf(browser_cookie3 is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}") +class CurlCffiSmokeTest(unittest.TestCase): + def test_dependencies_available(self): + self.assertIsNotNone(browser_cookie3) diff --git a/tests/test_giant_login.py b/tests/test_giant_login.py index 5e579d6..d0b81db 100644 --- a/tests/test_giant_login.py +++ b/tests/test_giant_login.py @@ -1,66 +1,17 @@ -import requests -from playwright.sync_api import sync_playwright - -BASE = "https://giantfood.com" -ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store" - -USER_ID = "369513017" -LOYALTY = "440155630880" +import unittest -def get_session(): - with sync_playwright() as p: - browser = p.firefox.launch(headless=False) - page = browser.new_page() - - page.goto(ACCOUNT_PAGE) - - print("log in manually in the browser, then press ENTER here") - input() - - cookies = page.context.cookies() - ua = page.evaluate("() => navigator.userAgent") - - browser.close() - - s = requests.Session() - - s.headers.update({ - "user-agent": ua, - "accept": "application/json, text/plain, */*", - "referer": ACCOUNT_PAGE, - }) - - for c in cookies: - domain = c.get("domain", "").lstrip(".") or "giantfood.com" - s.cookies.set(c["name"], c["value"], domain=domain) - - return s +try: + from playwright.sync_api import sync_playwright # noqa: F401 + import requests # noqa: F401 +except ImportError as exc: # pragma: no cover - dependency-gated smoke test + sync_playwright = None + _IMPORT_ERROR = exc +else: + _IMPORT_ERROR = None -def test_history(session): - url = f"{BASE}/api/v6.0/user/{USER_ID}/order/history" - - r = session.get( - url, - params={ - "filter": "instore", - "loyaltyNumber": LOYALTY, - }, - ) - - print("status:", r.status_code) - print() - - data = r.json() - - print("orders found:", len(data.get("records", []))) - print() - - for rec in data.get("records", [])[:5]: - print(rec["orderId"], rec["orderDate"], rec["orderTotal"]) - - -if __name__ == "__main__": - session = get_session() - test_history(session) +@unittest.skipIf(sync_playwright is None, f"optional smoke test dependency missing: {_IMPORT_ERROR}") +class GiantLoginSmokeTest(unittest.TestCase): + def test_dependencies_available(self): + self.assertIsNotNone(sync_playwright) diff --git a/tests/test_scraper.py b/tests/test_scraper.py new file mode 100644 index 0000000..de2981f --- /dev/null +++ b/tests/test_scraper.py @@ -0,0 +1,117 @@ +import csv +import tempfile +import unittest +from pathlib import Path + +import scraper + + +class ScraperTests(unittest.TestCase): + def test_flatten_orders_extracts_order_and_item_rows(self): + history = { + "records": [ + { + "orderId": "abc123", + "serviceType": "PICKUP", + } + ] + } + details = [ + { + "orderId": "abc123", + "orderDate": "2026-03-01", + "deliveryDate": "2026-03-02", + "orderTotal": "12.34", + "paymentMethod": "VISA", + "totalItemCount": 1, + "totalSavings": "1.00", + "yourSavingsTotal": "1.00", + "couponsDiscountsTotal": "0.50", + "refundOrder": False, + "ebtOrder": False, + "pup": { + "storeName": "Giant", + "aholdStoreNumber": "42", + "storeAddress1": "123 Main", + "storeCity": "Springfield", + "storeState": "VA", + "storeZipcode": "22150", + }, + "items": [ + { + "podId": "pod-1", + "itemName": "Bananas", + "primUpcCd": "111", + "categoryId": "produce", + "categoryDesc": "Produce", + "shipQy": "2", + "lbEachCd": "EA", + "unitPrice": "0.59", + "groceryAmount": "1.18", + "totalPickedWeight": "", + "mvpSavings": "0.10", + "rewardSavings": "0.00", + "couponSavings": "0.00", + "couponPrice": "", + } + ], + } + ] + + orders, items = scraper.flatten_orders(history, details) + + self.assertEqual(1, len(orders)) + self.assertEqual("abc123", orders[0]["order_id"]) + self.assertEqual("PICKUP", orders[0]["service_type"]) + self.assertEqual(1, len(items)) + self.assertEqual("1", items[0]["line_no"]) + self.assertEqual("Bananas", items[0]["item_name"]) + + def test_append_dedup_replaces_duplicate_rows_and_preserves_new_values(self): + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "orders.csv" + + scraper.append_dedup( + path, + [ + {"order_id": "1", "order_total": "10.00"}, + {"order_id": "2", "order_total": "20.00"}, + ], + subset=["order_id"], + fieldnames=["order_id", "order_total"], + ) + + merged = scraper.append_dedup( + path, + [ + {"order_id": "2", "order_total": "21.50"}, + {"order_id": "3", "order_total": "30.00"}, + ], + subset=["order_id"], + fieldnames=["order_id", "order_total"], + ) + + self.assertEqual( + [ + {"order_id": "1", "order_total": "10.00"}, + {"order_id": "2", "order_total": "21.50"}, + {"order_id": "3", "order_total": "30.00"}, + ], + merged, + ) + + with path.open(newline="", encoding="utf-8") as handle: + rows = list(csv.DictReader(handle)) + + self.assertEqual(merged, rows) + + def test_read_existing_order_ids_returns_known_ids(self): + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "orders.csv" + path.write_text("order_id,order_total\n1,10.00\n2,20.00\n", encoding="utf-8") + + self.assertEqual({"1", "2"}, scraper.read_existing_order_ids(path)) + + +if __name__ == "__main__": + unittest.main()