Add shared browser session bootstrap

This commit is contained in:
ben
2026-03-16 13:54:00 -04:00
parent 0f797d0a96
commit 7789c2e6ae
7 changed files with 741 additions and 32 deletions

194
browser_session.py Normal file
View File

@@ -0,0 +1,194 @@
import configparser
import os
import shutil
import sqlite3
import tempfile
from dataclasses import dataclass
from pathlib import Path
import browser_cookie3
@dataclass
class StorageEntry:
origin: str
key: str
value: str
source: str
@dataclass
class BrowserContext:
cookies: object
storage_entries: list[StorageEntry]
def load_browser_context(
browser,
domain_name,
storage_origins=None,
profile_dir=None,
):
if browser != "firefox":
raise ValueError(f"unsupported browser: {browser}")
profile = Path(profile_dir) if profile_dir else find_firefox_profile_dir()
cookies = load_firefox_cookies(domain_name, profile)
storage_entries = read_firefox_storage_entries(
profile,
origin_filters=storage_origins or [],
)
return BrowserContext(cookies=cookies, storage_entries=storage_entries)
def find_firefox_profile_dir():
profiles_ini = firefox_profiles_root() / "profiles.ini"
parser = configparser.RawConfigParser()
if not profiles_ini.exists():
raise FileNotFoundError(f"Firefox profiles.ini not found at {profiles_ini}")
parser.read(profiles_ini, encoding="utf-8")
profiles = []
for section in parser.sections():
if not section.startswith("Profile"):
continue
path_value = parser.get(section, "Path", fallback="")
if not path_value:
continue
is_relative = parser.getboolean(section, "IsRelative", fallback=True)
profile_path = (
profiles_ini.parent / path_value if is_relative else Path(path_value)
)
profiles.append(
(
parser.getboolean(section, "Default", fallback=False),
profile_path,
)
)
if not profiles:
raise FileNotFoundError("No Firefox profiles found in profiles.ini")
profiles.sort(key=lambda item: (not item[0], str(item[1])))
return profiles[0][1]
def firefox_profiles_root():
if os.name == "nt":
appdata = os.getenv("APPDATA", "").strip()
if not appdata:
raise FileNotFoundError("APPDATA is not set")
return Path(appdata) / "Mozilla" / "Firefox"
return Path.home() / ".mozilla" / "firefox"
def load_firefox_cookies(domain_name, profile_dir):
cookie_file = Path(profile_dir) / "cookies.sqlite"
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
def read_firefox_storage_entries(profile_dir, origin_filters):
profile_dir = Path(profile_dir)
entries = []
entries.extend(read_firefox_ls_entries(profile_dir, origin_filters))
entries.extend(read_firefox_webapps_entries(profile_dir, origin_filters))
deduped = []
seen = set()
for entry in entries:
key = (entry.origin, entry.key, entry.value, entry.source)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def read_firefox_ls_entries(profile_dir, origin_filters):
entries = []
storage_root = profile_dir / "storage" / "default"
if not storage_root.exists():
return entries
for ls_path in storage_root.glob("*/ls/data.sqlite"):
origin = decode_firefox_origin(ls_path.parents[1].name)
if not origin_matches(origin, origin_filters):
continue
for row in query_sqlite(ls_path, "SELECT key, value FROM data"):
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[0]),
value=stringify_sql_value(row[1]),
source=ls_path.as_posix(),
)
)
return entries
def read_firefox_webapps_entries(profile_dir, origin_filters):
webapps_path = profile_dir / "webappsstore.sqlite"
if not webapps_path.exists():
return []
entries = []
for row in query_sqlite(
webapps_path,
"SELECT originKey, key, value FROM webappsstore2",
):
origin = stringify_sql_value(row[0])
if not origin_matches(origin, origin_filters):
continue
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[1]),
value=stringify_sql_value(row[2]),
source=webapps_path.as_posix(),
)
)
return entries
def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path)
try:
with sqlite3.connect(copied_path) as connection:
return list(connection.execute(query))
except sqlite3.OperationalError:
return []
finally:
copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path):
source_path = Path(path)
with tempfile.NamedTemporaryFile(delete=False, suffix=source_path.suffix) as handle:
temp_path = Path(handle.name)
shutil.copy2(source_path, temp_path)
return temp_path
def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0]
return origin.replace("+++", "://")
def origin_matches(origin, origin_filters):
if not origin_filters:
return True
normalized_origin = origin.lower()
return any(filter_value.lower() in normalized_origin for filter_value in origin_filters)
def stringify_sql_value(value):
if value is None:
return ""
if isinstance(value, bytes):
for encoding in ("utf-8", "utf-16-le", "utf-16"):
try:
return value.decode(encoding)
except UnicodeDecodeError:
continue
return value.decode("utf-8", errors="ignore")
return str(value)

136
retailer_sessions.py Normal file
View File

@@ -0,0 +1,136 @@
import json
import re
from dataclasses import dataclass
from browser_session import load_browser_context
UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{12}$"
)
JWT_RE = re.compile(r"^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$")
@dataclass
class RetailerSession:
cookies: object
headers: dict[str, str]
def load_giant_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name="giantfood.com",
storage_origins=["giantfood.com"],
profile_dir=profile_dir,
)
return RetailerSession(cookies=context.cookies, headers={})
def load_costco_session(browser="firefox", profile_dir=None):
context = load_browser_context(
browser=browser,
domain_name=".costco.com",
storage_origins=["costco.com"],
profile_dir=profile_dir,
)
return RetailerSession(
cookies=context.cookies,
headers=extract_costco_headers(context.storage_entries),
)
def extract_costco_headers(storage_entries):
authorization = ""
client_id = ""
client_identifier = ""
for key_path, value in iter_storage_candidates(storage_entries):
normalized_key = normalize_key(key_path)
normalized_value = str(value).strip()
if not normalized_value:
continue
if not authorization and looks_like_authorization(normalized_key, normalized_value):
authorization = normalize_authorization(normalized_value)
continue
if not client_identifier and looks_like_client_identifier(
normalized_key, normalized_value
):
client_identifier = normalized_value
continue
if not client_id and looks_like_client_id(normalized_key, normalized_value):
client_id = normalized_value
headers = {}
if authorization:
headers["costco-x-authorization"] = authorization
if client_id:
headers["costco-x-wcs-clientId"] = client_id
if client_identifier:
headers["client-identifier"] = client_identifier
return headers
def iter_storage_candidates(storage_entries):
for entry in storage_entries:
yield entry.key, entry.value
yield from walk_candidate_value(entry.key, parse_json_value(entry.value))
def walk_candidate_value(prefix, value):
if isinstance(value, dict):
for key, nested in value.items():
nested_prefix = f"{prefix}.{key}"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
elif isinstance(value, list):
for index, nested in enumerate(value):
nested_prefix = f"{prefix}[{index}]"
yield nested_prefix, nested
yield from walk_candidate_value(nested_prefix, nested)
def parse_json_value(value):
if not isinstance(value, str):
return value
text = value.strip()
if not text or text[0] not in "{[":
return value
try:
return json.loads(text)
except json.JSONDecodeError:
return value
def normalize_key(value):
return re.sub(r"[^a-z0-9]+", "", value.lower())
def looks_like_authorization(key, value):
return (
("authorization" in key or "token" in key)
and bool(normalize_authorization(value))
)
def normalize_authorization(value):
candidate = str(value).strip()
if candidate.lower().startswith("bearer "):
token = candidate.split(None, 1)[1].strip()
return f"Bearer {token}" if JWT_RE.match(token) else ""
if JWT_RE.match(candidate):
return f"Bearer {candidate}"
return ""
def looks_like_client_id(key, value):
return "clientid" in key and "identifier" not in key and bool(UUID_RE.match(value))
def looks_like_client_identifier(key, value):
return "clientidentifier" in key and bool(UUID_RE.match(value))

View File

@@ -1,17 +1,16 @@
import os
import csv
import json
import time
import re
from dotenv import load_dotenv
from calendar import monthrange
from datetime import datetime, timedelta
from pathlib import Path
import click
import browser_cookie3
from curl_cffi import requests
from retailer_sessions import load_costco_session
BASE_URL = "https://ecom-api.costco.com/ebusiness/order/v1/orders/graphql"
RETAILER = "costco"
@@ -210,16 +209,7 @@ ITEM_FIELDS = [
"is_coupon_line",
]
def load_config():
load_dotenv()
return {
"authorization": os.getenv("COSTCO_X_AUTHORIZATION", "").strip(),
"client_id": os.getenv("COSTCO_X_WCS_CLIENTID", "").strip(),
"client_identifier": os.getenv("COSTCO_CLIENT_IDENTIFIER", "").strip(),
}
def build_headers(config):
def build_headers(auth_headers):
headers = {
"accept": "*/*",
"content-type": "application/json-patch+json",
@@ -232,18 +222,14 @@ def build_headers(config):
"Gecko/20100101 Firefox/148.0"
),
}
if config["authorization"]:
headers["costco-x-authorization"] = config["authorization"]
if config["client_id"]:
headers["costco-x-wcs-clientId"] = config["client_id"]
if config["client_identifier"]:
headers["client-identifier"] = config["client_identifier"]
headers.update(auth_headers)
return headers
def build_session(config):
def build_session():
retailer_session = load_costco_session()
session = requests.Session()
session.cookies.update(browser_cookie3.firefox(domain_name=".costco.com"))
session.headers.update(build_headers(config))
session.cookies.update(retailer_session.cookies)
session.headers.update(build_headers(retailer_session.headers))
return session
@@ -596,17 +582,10 @@ def main(outdir, document_type, document_sub_type, window_days, months_back):
outdir = Path(outdir)
raw_dir = outdir / "raw"
try:
config = load_config()
click.echo(
"auth headers present: "
f"authorization={bool(config['authorization'])}, "
f"client_id={bool(config['client_id'])}, "
f"client_identifier={bool(config['client_identifier'])}"
)
session = build_session(config)
session = build_session()
except Exception as exc:
raise click.ClickException(
f"failed to load Costco Firefox cookies: {exc}"
f"failed to load Costco browser session: {exc}"
) from exc
start_date, end_date = resolve_date_range(months_back)

333
scrape_giant.py Normal file
View File

@@ -0,0 +1,333 @@
import csv
import json
import os
import time
from pathlib import Path
import click
from dotenv import load_dotenv
from curl_cffi import requests
from retailer_sessions import load_giant_session
BASE = "https://giantfood.com"
ACCOUNT_PAGE = f"{BASE}/account/history/invoice/in-store"
ORDER_FIELDS = [
"order_id",
"order_date",
"delivery_date",
"service_type",
"order_total",
"payment_method",
"total_item_count",
"total_savings",
"your_savings_total",
"coupons_discounts_total",
"store_name",
"store_number",
"store_address1",
"store_city",
"store_state",
"store_zipcode",
"refund_order",
"ebt_order",
]
ITEM_FIELDS = [
"order_id",
"order_date",
"line_no",
"pod_id",
"item_name",
"upc",
"category_id",
"category",
"qty",
"unit",
"unit_price",
"line_total",
"picked_weight",
"mvp_savings",
"reward_savings",
"coupon_savings",
"coupon_price",
]
def load_config():
if load_dotenv is not None:
load_dotenv()
return {
"user_id": os.getenv("GIANT_USER_ID", "").strip(),
"loyalty": os.getenv("GIANT_LOYALTY_NUMBER", "").strip(),
}
def build_session():
browser_session = load_giant_session()
session = requests.Session()
session.cookies.update(browser_session.cookies)
session.headers.update(
{
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"referer": ACCOUNT_PAGE,
}
)
return session
def safe_get(session, url, **kwargs):
last_response = None
for attempt in range(3):
try:
response = session.get(
url,
impersonate="firefox",
timeout=30,
**kwargs,
)
last_response = response
if response.status_code == 200:
return response
click.echo(f"retry {attempt + 1}/3 status={response.status_code}")
except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3)
if last_response is not None:
last_response.raise_for_status()
raise RuntimeError(f"failed to fetch {url}")
def get_history(session, user_id, loyalty):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history",
params={"filter": "instore", "loyaltyNumber": loyalty},
)
return response.json()
def get_order_detail(session, user_id, order_id):
response = safe_get(
session,
f"{BASE}/api/v6.0/user/{user_id}/order/history/detail/{order_id}",
params={"isInStore": "true"},
)
return response.json()
def flatten_orders(history, details):
orders = []
items = []
history_lookup = {record["orderId"]: record for record in history.get("records", [])}
for detail in details:
order_id = str(detail["orderId"])
history_row = history_lookup.get(detail["orderId"], {})
pickup = detail.get("pup", {})
orders.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"delivery_date": detail.get("deliveryDate"),
"service_type": history_row.get("serviceType"),
"order_total": detail.get("orderTotal"),
"payment_method": detail.get("paymentMethod"),
"total_item_count": detail.get("totalItemCount"),
"total_savings": detail.get("totalSavings"),
"your_savings_total": detail.get("yourSavingsTotal"),
"coupons_discounts_total": detail.get("couponsDiscountsTotal"),
"store_name": pickup.get("storeName"),
"store_number": pickup.get("aholdStoreNumber"),
"store_address1": pickup.get("storeAddress1"),
"store_city": pickup.get("storeCity"),
"store_state": pickup.get("storeState"),
"store_zipcode": pickup.get("storeZipcode"),
"refund_order": detail.get("refundOrder"),
"ebt_order": detail.get("ebtOrder"),
}
)
for line_no, item in enumerate(detail.get("items", []), start=1):
items.append(
{
"order_id": order_id,
"order_date": detail.get("orderDate"),
"line_no": str(line_no),
"pod_id": item.get("podId"),
"item_name": item.get("itemName"),
"upc": item.get("primUpcCd"),
"category_id": item.get("categoryId"),
"category": item.get("categoryDesc"),
"qty": item.get("shipQy"),
"unit": item.get("lbEachCd"),
"unit_price": item.get("unitPrice"),
"line_total": item.get("groceryAmount"),
"picked_weight": item.get("totalPickedWeight"),
"mvp_savings": item.get("mvpSavings"),
"reward_savings": item.get("rewardSavings"),
"coupon_savings": item.get("couponSavings"),
"coupon_price": item.get("couponPrice"),
}
)
return orders, items
def normalize_row(row, fieldnames):
return {field: stringify(row.get(field)) for field in fieldnames}
def stringify(value):
if value is None:
return ""
return str(value)
def read_csv_rows(path):
if not path.exists():
return [], []
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
fieldnames = reader.fieldnames or []
return fieldnames, list(reader)
def read_existing_order_ids(path):
_, rows = read_csv_rows(path)
return {row["order_id"] for row in rows if row.get("order_id")}
def merge_rows(existing_rows, new_rows, subset):
merged = []
row_index = {}
for row in existing_rows + new_rows:
key = tuple(stringify(row.get(field)) for field in subset)
normalized = dict(row)
if key in row_index:
merged[row_index[key]] = normalized
else:
row_index[key] = len(merged)
merged.append(normalized)
return merged
def append_dedup(path, new_rows, subset, fieldnames):
existing_fieldnames, existing_rows = read_csv_rows(path)
all_fieldnames = list(dict.fromkeys(existing_fieldnames + fieldnames))
merged = merge_rows(
[normalize_row(row, all_fieldnames) for row in existing_rows],
[normalize_row(row, all_fieldnames) for row in new_rows],
subset=subset,
)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=all_fieldnames)
writer.writeheader()
writer.writerows(merged)
return merged
def write_json(path, payload):
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
@click.command()
@click.option("--user-id", default=None, help="Giant user id.")
@click.option("--loyalty", default=None, help="Giant loyalty number.")
@click.option(
"--outdir",
default="giant_output",
show_default=True,
help="Directory for raw json and csv outputs.",
)
@click.option(
"--sleep-seconds",
default=1.5,
show_default=True,
type=float,
help="Delay between order detail requests.",
)
def main(user_id, loyalty, outdir, sleep_seconds):
config = load_config()
user_id = user_id or config["user_id"] or click.prompt("Giant user id", type=str)
loyalty = loyalty or config["loyalty"] or click.prompt(
"Giant loyalty number", type=str
)
outdir = Path(outdir)
rawdir = outdir / "raw"
rawdir.mkdir(parents=True, exist_ok=True)
orders_csv = outdir / "orders.csv"
items_csv = outdir / "items.csv"
existing_order_ids = read_existing_order_ids(orders_csv)
session = build_session()
history = get_history(session, user_id, loyalty)
write_json(rawdir / "history.json", history)
records = history.get("records", [])
click.echo(f"history returned {len(records)} visits; Giant exposes only the most recent 50")
unseen_records = [
record
for record in records
if stringify(record.get("orderId")) not in existing_order_ids
]
click.echo(
f"found {len(unseen_records)} unseen visits "
f"({len(existing_order_ids)} already stored)"
)
details = []
for index, record in enumerate(unseen_records, start=1):
order_id = stringify(record.get("orderId"))
click.echo(f"[{index}/{len(unseen_records)}] fetching {order_id}")
detail = get_order_detail(session, user_id, order_id)
write_json(rawdir / f"{order_id}.json", detail)
details.append(detail)
if index < len(unseen_records):
time.sleep(sleep_seconds)
orders, items = flatten_orders(history, details)
merged_orders = append_dedup(
orders_csv,
orders,
subset=["order_id"],
fieldnames=ORDER_FIELDS,
)
merged_items = append_dedup(
items_csv,
items,
subset=["order_id", "line_no"],
fieldnames=ITEM_FIELDS,
)
click.echo(
f"wrote {len(orders)} new orders / {len(items)} new items "
f"({len(merged_orders)} total orders, {len(merged_items)} total items)"
)
if __name__ == "__main__":
main()

5
scraper.py Normal file
View File

@@ -0,0 +1,5 @@
from scrape_giant import * # noqa: F401,F403
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,62 @@
import sqlite3
import tempfile
import unittest
from pathlib import Path
import browser_session
import retailer_sessions
class BrowserSessionTests(unittest.TestCase):
def test_read_firefox_ls_entries_reads_storage_from_copied_sqlite(self):
with tempfile.TemporaryDirectory() as tmpdir:
profile_dir = Path(tmpdir) / "abcd.default-release"
ls_dir = profile_dir / "storage" / "default" / "https+++www.costco.com" / "ls"
ls_dir.mkdir(parents=True)
db_path = ls_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
("session", '{"costco":{"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}}'),
)
entries = browser_session.read_firefox_storage_entries(
profile_dir,
origin_filters=["costco.com"],
)
self.assertEqual(1, len(entries))
self.assertEqual("https://www.costco.com", entries[0].origin)
self.assertEqual("session", entries[0].key)
def test_extract_costco_headers_from_storage_json(self):
entries = [
browser_session.StorageEntry(
origin="https://www.costco.com",
key="authState",
value=(
'{"authorization":"Bearer header.payload.signature",'
'"wcsClientId":"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",'
'"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}'
),
source="memory",
)
]
headers = retailer_sessions.extract_costco_headers(entries)
self.assertEqual("Bearer header.payload.signature", headers["costco-x-authorization"])
self.assertEqual(
"4900eb1f-0c10-4bd9-99c3-c59e6c1ecebf",
headers["costco-x-wcs-clientId"],
)
self.assertEqual(
"481b1aec-aa3b-454b-b81b-48187e28f205",
headers["client-identifier"],
)
if __name__ == "__main__":
unittest.main()

View File

@@ -234,7 +234,7 @@ class CostcoPipelineTests(unittest.TestCase):
self.assertEqual("VISA", orders[0]["payment_method"])
self.assertEqual("true", items[0]["is_coupon_line"])
self.assertIn("dup::2026-03-12T16:16:00.json", items[0]["raw_order_path"])
self.assertIn("dup-2026-03-12T16-16-00.json", items[0]["raw_order_path"])
def test_costco_enricher_parses_size_pack_and_discount(self):
row = enrich_costco.parse_costco_item(