137 lines
3.9 KiB
Python
137 lines
3.9 KiB
Python
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
|
|
from browser_session import load_browser_context
|
|
|
|
|
|
UUID_RE = re.compile(
|
|
r"^[0-9a-fA-F]{8}-"
|
|
r"[0-9a-fA-F]{4}-"
|
|
r"[0-9a-fA-F]{4}-"
|
|
r"[0-9a-fA-F]{4}-"
|
|
r"[0-9a-fA-F]{12}$"
|
|
)
|
|
JWT_RE = re.compile(r"^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$")
|
|
|
|
|
|
@dataclass
|
|
class RetailerSession:
|
|
cookies: object
|
|
headers: dict[str, str]
|
|
|
|
|
|
def load_giant_session(browser="firefox", profile_dir=None):
|
|
context = load_browser_context(
|
|
browser=browser,
|
|
domain_name="giantfood.com",
|
|
storage_origins=["giantfood.com"],
|
|
profile_dir=profile_dir,
|
|
)
|
|
return RetailerSession(cookies=context.cookies, headers={})
|
|
|
|
|
|
def load_costco_session(browser="firefox", profile_dir=None):
|
|
context = load_browser_context(
|
|
browser=browser,
|
|
domain_name=".costco.com",
|
|
storage_origins=["costco.com"],
|
|
profile_dir=profile_dir,
|
|
)
|
|
return RetailerSession(
|
|
cookies=context.cookies,
|
|
headers=extract_costco_headers(context.storage_entries),
|
|
)
|
|
|
|
|
|
def extract_costco_headers(storage_entries):
|
|
authorization = ""
|
|
client_id = ""
|
|
client_identifier = ""
|
|
|
|
for key_path, value in iter_storage_candidates(storage_entries):
|
|
normalized_key = normalize_key(key_path)
|
|
normalized_value = str(value).strip()
|
|
if not normalized_value:
|
|
continue
|
|
|
|
if not authorization and looks_like_authorization(normalized_key, normalized_value):
|
|
authorization = normalize_authorization(normalized_value)
|
|
continue
|
|
if not client_identifier and looks_like_client_identifier(
|
|
normalized_key, normalized_value
|
|
):
|
|
client_identifier = normalized_value
|
|
continue
|
|
if not client_id and looks_like_client_id(normalized_key, normalized_value):
|
|
client_id = normalized_value
|
|
|
|
headers = {}
|
|
if authorization:
|
|
headers["costco-x-authorization"] = authorization
|
|
if client_id:
|
|
headers["costco-x-wcs-clientId"] = client_id
|
|
if client_identifier:
|
|
headers["client-identifier"] = client_identifier
|
|
return headers
|
|
|
|
|
|
def iter_storage_candidates(storage_entries):
|
|
for entry in storage_entries:
|
|
yield entry.key, entry.value
|
|
yield from walk_candidate_value(entry.key, parse_json_value(entry.value))
|
|
|
|
|
|
def walk_candidate_value(prefix, value):
|
|
if isinstance(value, dict):
|
|
for key, nested in value.items():
|
|
nested_prefix = f"{prefix}.{key}"
|
|
yield nested_prefix, nested
|
|
yield from walk_candidate_value(nested_prefix, nested)
|
|
elif isinstance(value, list):
|
|
for index, nested in enumerate(value):
|
|
nested_prefix = f"{prefix}[{index}]"
|
|
yield nested_prefix, nested
|
|
yield from walk_candidate_value(nested_prefix, nested)
|
|
|
|
|
|
def parse_json_value(value):
|
|
if not isinstance(value, str):
|
|
return value
|
|
text = value.strip()
|
|
if not text or text[0] not in "{[":
|
|
return value
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return value
|
|
|
|
|
|
def normalize_key(value):
|
|
return re.sub(r"[^a-z0-9]+", "", value.lower())
|
|
|
|
|
|
def looks_like_authorization(key, value):
|
|
return (
|
|
("authorization" in key or "token" in key)
|
|
and bool(normalize_authorization(value))
|
|
)
|
|
|
|
|
|
def normalize_authorization(value):
|
|
candidate = str(value).strip()
|
|
if candidate.lower().startswith("bearer "):
|
|
token = candidate.split(None, 1)[1].strip()
|
|
return f"Bearer {token}" if JWT_RE.match(token) else ""
|
|
if JWT_RE.match(candidate):
|
|
return f"Bearer {candidate}"
|
|
return ""
|
|
|
|
|
|
def looks_like_client_id(key, value):
|
|
return "clientid" in key and "identifier" not in key and bool(UUID_RE.match(value))
|
|
|
|
|
|
def looks_like_client_identifier(key, value):
|
|
return "clientidentifier" in key and bool(UUID_RE.match(value))
|