Compare commits

...

2 Commits

Author SHA1 Message Date
ben
4fd309251d Record t1.8.6 task evidence 2026-03-16 13:54:11 -04:00
ben
7789c2e6ae Add shared browser session bootstrap 2026-03-16 13:54:00 -04:00
8 changed files with 745 additions and 36 deletions

194
browser_session.py Normal file
View File

@@ -0,0 +1,194 @@
import configparser
import os
import shutil
import sqlite3
import tempfile
from dataclasses import dataclass
from pathlib import Path
import browser_cookie3
@dataclass
class StorageEntry:
origin: str
key: str
value: str
source: str
@dataclass
class BrowserContext:
cookies: object
storage_entries: list[StorageEntry]
def load_browser_context(
browser,
domain_name,
storage_origins=None,
profile_dir=None,
):
if browser != "firefox":
raise ValueError(f"unsupported browser: {browser}")
profile = Path(profile_dir) if profile_dir else find_firefox_profile_dir()
cookies = load_firefox_cookies(domain_name, profile)
storage_entries = read_firefox_storage_entries(
profile,
origin_filters=storage_origins or [],
)
return BrowserContext(cookies=cookies, storage_entries=storage_entries)
def find_firefox_profile_dir():
profiles_ini = firefox_profiles_root() / "profiles.ini"
parser = configparser.RawConfigParser()
if not profiles_ini.exists():
raise FileNotFoundError(f"Firefox profiles.ini not found at {profiles_ini}")
parser.read(profiles_ini, encoding="utf-8")
profiles = []
for section in parser.sections():
if not section.startswith("Profile"):
continue
path_value = parser.get(section, "Path", fallback="")
if not path_value:
continue
is_relative = parser.getboolean(section, "IsRelative", fallback=True)
profile_path = (
profiles_ini.parent / path_value if is_relative else Path(path_value)
)
profiles.append(
(
parser.getboolean(section, "Default", fallback=False),
profile_path,
)
)
if not profiles:
raise FileNotFoundError("No Firefox profiles found in profiles.ini")
profiles.sort(key=lambda item: (not item[0], str(item[1])))
return profiles[0][1]
def firefox_profiles_root():
if os.name == "nt":
appdata = os.getenv("APPDATA", "").strip()
if not appdata:
raise FileNotFoundError("APPDATA is not set")
return Path(appdata) / "Mozilla" / "Firefox"
return Path.home() / ".mozilla" / "firefox"
def load_firefox_cookies(domain_name, profile_dir):
cookie_file = Path(profile_dir) / "cookies.sqlite"
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
def read_firefox_storage_entries(profile_dir, origin_filters):
profile_dir = Path(profile_dir)
entries = []
entries.extend(read_firefox_ls_entries(profile_dir, origin_filters))
entries.extend(read_firefox_webapps_entries(profile_dir, origin_filters))
deduped = []
seen = set()
for entry in entries:
key = (entry.origin, entry.key, entry.value, entry.source)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def read_firefox_ls_entries(profile_dir, origin_filters):
entries = []
storage_root = profile_dir / "storage" / "default"
if not storage_root.exists():
return entries
for ls_path in storage_root.glob("*/ls/data.sqlite"):
origin = decode_firefox_origin(ls_path.parents[1].name)
if not origin_matches(origin, origin_filters):
continue
for row in query_sqlite(ls_path, "SELECT key, value FROM data"):
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[0]),
value=stringify_sql_value(row[1]),
source=ls_path.as_posix(),
)
)
return entries
def read_firefox_webapps_entries(profile_dir, origin_filters):
webapps_path = profile_dir / "webappsstore.sqlite"
if not webapps_path.exists():
return []
entries = []
for row in query_sqlite(
webapps_path,
"SELECT originKey, key, value FROM webappsstore2",
):
origin = stringify_sql_value(row[0])
if not origin_matches(origin, origin_filters):
continue
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[1]),
value=stringify_sql_value(row[2]),
source=webapps_path.as_posix(),
)
)
return entries
def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path)
try:
with sqlite3.connect(copied_path) as connection:
return list(connection.execute(query))
except sqlite3.OperationalError:
return []
finally:
copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path):
source_path = Path(path)
with tempfile.NamedTemporaryFile(delete=False, suffix=source_path.suffix) as handle:
temp_path = Path(handle.name)
shutil.copy2(source_path, temp_path)
return temp_path
def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0]
return origin.replace("+++", "://")
def origin_matches(origin, origin_filters):
if not origin_filters:
return True
normalized_origin = origin.lower()
return any(filter_value.lower() in normalized_origin for filter_value in origin_filters)
def stringify_sql_value(value):
if value is None:
return ""
if isinstance(value, bytes):
for encoding in ("utf-8", "utf-16-le", "utf-16"):
try:
return value.decode(encoding)
except UnicodeDecodeError:
continue
return value.decode("utf-8", errors="ignore")
return str(value)

View File

@@ -254,7 +254,7 @@
- commit: `c0054dc` on branch `cx` - commit: `c0054dc` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified Costco summary/detail flattening now uses composite receipt keys in unit tests - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified Costco summary/detail flattening now uses composite receipt keys in unit tests
- date: 2026-03-16 - date: 2026-03-16
* [ ] t1.8.6: add browser session helper (2-4 commits) * [X] t1.8.6: add browser session helper (2-4 commits)
** acceptance criteria ** acceptance criteria
- create a separate Python module/script that extracts firefox browser session data needed for giant and costco scrapers. - create a separate Python module/script that extracts firefox browser session data needed for giant and costco scrapers.
@@ -273,9 +273,9 @@
- Firefox only; Chromium support later - Firefox only; Chromium support later
** evidence ** evidence
- commit: - commit: `7789c2e` on branch `cx`
- tests: - tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_giant.py --help`; `./venv/bin/python scrape_costco.py --help`; verified Firefox storage token extraction and locked-db copy behavior in unit tests
- date: - date: 2026-03-16
* [ ] t1.9: compute normalized comparison metrics (2-4 commits) * [ ] t1.9: compute normalized comparison metrics (2-4 commits)
** acceptance criteria ** acceptance criteria

136
retailer_sessions.py Normal file
View File

@@ -0,0 +1,136 @@
import json
import re
from dataclasses import dataclass
from browser_session import load_browser_context
UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{12}$"
)
JWT_RE = re.compile(r"^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$")
@dataclass
class RetailerSession:
cookies: object
headers: dict[str, str]
def load_giant_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name="giantfood.com",
storage_origins=["giantfood.com"],
profile_dir=profile_dir,
)
return RetailerSession(cookies=context.cookies, headers={})
def load_costco_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name=".costco.com",
storage_origins=["costco.com"],
profile_dir=profile_dir,
)
return RetailerSession(
cookies=context.cookies,
headers=extract_costco_headers(context.storage_entries),
)
def extract_costco_headers(storage_entries):
authorization = ""
client_id = ""
client_identifier = ""
for key_path, value in iter_storage_candidates(storage_entries):
normalized_key = normalize_key(key_path)
normalized_value = str(value).strip()
if not normalized_value:
continue
if not authorization and looks_like_authorization(normalized_key, normalized_value):
authorization = normalize_authorization(normalized_value)
continue
if not client_identifier and looks_like_client_identifier(
normalized_key, normalized_value
):
client_identifier = normalized_value
continue
if not client_id and looks_like_client_id(normalized_key, normalized_value):
client_id = normalized_value
headers = {}
if authorization:
headers["costco-x-authorization"] = authorization
if client_id:
headers["costco-x-wcs-clientId"] = client_id
if client_identifier:
headers["client-identifier"] = client_identifier
return headers
def iter_storage_candidates(storage_entries):
for entry in storage_entries:
yield entry.key, entry.value
yield from walk_candidate_value(entry.key, parse_json_value(entry.value))
def walk_candidate_value(prefix, value):
if isinstance(value, dict):
for key, nested in value.items():
nested_prefix = f"{prefix}.{key}"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
elif isinstance(value, list):
for index, nested in enumerate(value):
nested_prefix = f"{prefix}[{index}]"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
def parse_json_value(value):
if not isinstance(value, str):
return value
text = value.strip()
if not text or text[0] not in "{[":
return value
try:
return json.loads(text)
except json.JSONDecodeError:
return value
def normalize_key(value):
return re.sub(r"[^a-z0-9]+", "", value.lower())
def looks_like_authorization(key, value):
return (
("authorization" in key or "token" in key)
and bool(normalize_authorization(value))
)
def normalize_authorization(value):
candidate = str(value).strip()
if candidate.lower().startswith("bearer "):
token = candidate.split(None, 1)[1].strip()
return f"Bearer {token}" if JWT_RE.match(token) else ""
if JWT_RE.match(candidate):
return f"Bearer {candidate}"
return ""
def looks_like_client_id(key, value):
return "clientid" in key and "identifier" not in key and bool(UUID_RE.match(value))
def looks_like_client_identifier(key, value):
return "clientidentifier" in key and bool(UUID_RE.match(value))

View File

@@ -1,17 +1,16 @@
import os
import csv import csv
import json import json
import time import time
import re import re
from dotenv import load_dotenv
from calendar import monthrange from calendar import monthrange
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
import click import click
import browser_cookie3
from curl_cffi import requests from curl_cffi import requests
from retailer_sessions import load_costco_session
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql" BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco" RETAILER = "costco"
@@ -210,16 +209,7 @@ ITEM_FIELDS = [
"is_coupon_line", "is_coupon_line",
] ]
def build_headers(auth_headers):
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_X_WCS_CLIENTID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(config):
headers = { headers = {
"accept": "*/*", "accept": "*/*",
"content-type": "application/json-patch+json", "content-type": "application/json-patch+json",
@@ -232,18 +222,14 @@ def build_headers(config):
"Gecko/20100101 Firefox/148.0" "Gecko/20100101 Firefox/148.0"
), ),
} }
if config["authorization"]: headers.update(auth_headers)
headers["costco-x-authorization"] = config["authorization"]
if config["client_id"]:
headers["costco-x-wcs-clientId"] = config["client_id"]
if config["client_identifier"]:
headers["client-identifier"] = config["client_identifier"]
return headers return headers
def build_session(config): def build_session():
retailer_session = load_costco_session()
session = requests.Session() session = requests.Session()
session.cookies.update(browser_cookie3.firefox(domain_name=".costco.com")) session.cookies.update(retailer_session.cookies)
session.headers.update(build_headers(config)) session.headers.update(build_headers(retailer_session.headers))
return session return session
@@ -596,17 +582,10 @@ def main(outdir, document_type, document_sub_type, window_days, months_back):
outdir = Path(outdir) outdir = Path(outdir)
raw_dir = outdir / "raw" raw_dir = outdir / "raw"
try: try:
config = load_config() session = build_session()
click.echo(
"auth headers present: "
f"authorization={bool(config['authorization'])}, "
f"client_id={bool(config['client_id'])}, "
f"client_identifier={bool(config['client_identifier'])}"
)
session = build_session(config)
except Exception as exc: except Exception as exc:
raise click.ClickException( raise click.ClickException(
f"failed to load Costco Firefox cookies: {exc}" f"failed to load Costco browser session: {exc}"
) from exc ) from exc
start_date, end_date = resolve_date_range(months_back) start_date, end_date = resolve_date_range(months_back)

333
scrape_giant.py Normal file
View File

@@ -0,0 +1,333 @@
import csv
import json
import os
import time
from pathlib import Path
import click
from dotenv import load_dotenv
from curl_cffi import requests
from retailer_sessions import load_giant_session
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
ORDER_FIELDS = [
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
]
ITEM_FIELDS = [
"order_id",
"order_date",
"line_no",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
]
def load_config():
if load_dotenv is not None:
load_dotenv()
return {
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
}
def build_session():
browser_session = load_giant_session()
session = requests.Session()
session.cookies.update(browser_session.cookies)
session.headers.update(
{
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
}
)
return session
def safe_get(session, url, **kwargs):
last_response = None
for attempt in range(3):
try:
response = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = response
if response.status_code == 200:
return response
click.echo(f"retry {attempt + 1}/3 status={response.status_code}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError(f"failed to fetch {url}")
def get_history(session, user_id, loyalty):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history",
params={"filter": "instore", "loyaltyNumber": loyalty},
)
return response.json()
def get_order_detail(session, user_id, order_id):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}",
params={"isInStore": "true"},
)
return response.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {record["orderId"]: record for record in history.get("records", [])}
for detail in details:
order_id = str(detail["orderId"])
history_row = history_lookup.get(detail["orderId"], {})
pickup = detail.get("pup", {})
orders.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"delivery_date": detail.get("deliveryDate"),
"service_type": history_row.get("serviceType"),
"order_total": detail.get("orderTotal"),
"payment_method": detail.get("paymentMethod"),
"total_item_count": detail.get("totalItemCount"),
"total_savings": detail.get("totalSavings"),
"your_savings_total": detail.get("yourSavingsTotal"),
"coupons_discounts_total": detail.get("couponsDiscountsTotal"),
"store_name": pickup.get("storeName"),
"store_number": pickup.get("aholdStoreNumber"),
"store_address1": pickup.get("storeAddress1"),
"store_city": pickup.get("storeCity"),
"store_state": pickup.get("storeState"),
"store_zipcode": pickup.get("storeZipcode"),
"refund_order": detail.get("refundOrder"),
"ebt_order": detail.get("ebtOrder"),
}
)
for line_no, item in enumerate(detail.get("items", []), start=1):
items.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"line_no": str(line_no),
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
}
)
return orders, items
def normalize_row(row, fieldnames):
return {field: stringify(row.get(field)) for field in fieldnames}
def stringify(value):
if value is None:
return ""
return str(value)
def read_csv_rows(path):
if not path.exists():
return [], []
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
fieldnames = reader.fieldnames or []
return fieldnames, list(reader)
def read_existing_order_ids(path):
_, rows = read_csv_rows(path)
return {row["order_id"] for row in rows if row.get("order_id")}
def merge_rows(existing_rows, new_rows, subset):
merged = []
row_index = {}
for row in existing_rows + new_rows:
key = tuple(stringify(row.get(field)) for field in subset)
normalized = dict(row)
if key in row_index:
merged[row_index[key]] = normalized
else:
row_index[key] = len(merged)
merged.append(normalized)
return merged
def append_dedup(path, new_rows, subset, fieldnames):
existing_fieldnames, existing_rows = read_csv_rows(path)
all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames))
merged = merge_rows(
[normalize_row(row, all_fieldnames) for row in existing_rows],
[normalize_row(row, all_fieldnames) for row in new_rows],
subset=subset,
)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=all_fieldnames)
writer.writeheader()
writer.writerows(merged)
return merged
def write_json(path, payload):
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
@click.command()
@click.option("--user-id", default=None, help="Giant user id.")
@click.option("--loyalty", default=None, help="Giant loyalty number.")
@click.option(
"--outdir",
default="giant_output",
show_default=True,
help="Directory for raw json and csv outputs.",
)
@click.option(
"--sleep-seconds",
default=1.5,
show_default=True,
type=float,
help="Delay between order detail requests.",
)
def main(user_id, loyalty, outdir, sleep_seconds):
config = load_config()
user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str)
loyalty = loyalty or config["loyalty"] or click.prompt(
"Giant loyalty number", type=str
)
outdir = Path(outdir)
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
orders_csv = outdir / "orders.csv"
items_csv = outdir / "items.csv"
existing_order_ids = read_existing_order_ids(orders_csv)
session = build_session()
history = get_history(session, user_id, loyalty)
write_json(rawdir / "history.json", history)
records = history.get("records", [])
click.echo(f"history returned {len(records)} visits; Giant exposes only the most recent 50")
unseen_records = [
record
for record in records
if stringify(record.get("orderId")) not in existing_order_ids
]
click.echo(
f"found {len(unseen_records)} unseen visits "
f"({len(existing_order_ids)} already stored)"
)
details = []
for index, record in enumerate(unseen_records, start=1):
order_id = stringify(record.get("orderId"))
click.echo(f"[{index}/{len(unseen_records)}] fetching {order_id}")
detail = get_order_detail(session, user_id, order_id)
write_json(rawdir / f"{order_id}.json", detail)
details.append(detail)
if index < len(unseen_records):
time.sleep(sleep_seconds)
orders, items = flatten_orders(history, details)
merged_orders = append_dedup(
orders_csv,
orders,
subset=["order_id"],
fieldnames=ORDER_FIELDS,
)
merged_items = append_dedup(
items_csv,
items,
subset=["order_id", "line_no"],
fieldnames=ITEM_FIELDS,
)
click.echo(
f"wrote {len(orders)} new orders / {len(items)} new items "
f"({len(merged_orders)} total orders, {len(merged_items)} total items)"
)
if __name__ == "__main__":
main()

5
scraper.py Normal file
View File

@@ -0,0 +1,5 @@
from scrape_giant import * # noqa: F401,F403
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,62 @@
import sqlite3
import tempfile
import unittest
from pathlib import Path
import browser_session
import retailer_sessions
class BrowserSessionTests(unittest.TestCase):
def test_read_firefox_ls_entries_reads_storage_from_copied_sqlite(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir) / "abcd.default-release"
ls_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
ls_dir.mkdir(parents=True)
db_path = ls_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("session", '{"costco":{"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}}'),
)
entries = browser_session.read_firefox_storage_entries(
profile_dir,
origin_filters=["costco.com"],
)
self.assertEqual(1, len(entries))
self.assertEqual("https://www.costco.com", entries[0].origin)
self.assertEqual("session", entries[0].key)
def test_extract_costco_headers_from_storage_json(self):
entries = [
browser_session.StorageEntry(
origin="https://www.costco.com",
key="authState",
value=(
'{"authorization":"Bearer header.payload.signature",'
'"wcsClientId":"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",'
'"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}'
),
source="memory",
)
]
headers = retailer_sessions.extract_costco_headers(entries)
self.assertEqual("Bearer header.payload.signature", headers["costco-x-authorization"])
self.assertEqual(
"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
headers["costco-x-wcs-clientId"],
)
self.assertEqual(
"481b1aec-aa3b-454b-b81b-48187e28f205",
headers["client-identifier"],
)
if __name__ == "__main__":
unittest.main()

View File

@@ -234,7 +234,7 @@ class CostcoPipelineTests(unittest.TestCase):
self.assertEqual("VISA", orders[0]["payment_method"]) self.assertEqual("VISA", orders[0]["payment_method"])
self.assertEqual("true", items[0]["is_coupon_line"]) self.assertEqual("true", items[0]["is_coupon_line"])
self.assertIn("dup::2026-03-12T16:16:00.json", items[0]["raw_order_path"]) self.assertIn("dup-2026-03-12T16-16-00.json", items[0]["raw_order_path"])
def test_costco_enricher_parses_size_pack_and_discount(self): def test_costco_enricher_parses_size_pack_and_discount(self):
row = enrich_costco.parse_costco_item( row = enrich_costco.parse_costco_item(