Files
scrape-giant/browser_session.py

242 lines
6.7 KiB
Python

import configparser
import json
import os
import shutil
import sqlite3
import tempfile
from dataclasses import dataclass
from pathlib import Path
import browser_cookie3
@dataclass
class StorageEntry:
origin: str
key: str
value: str
source: str
@dataclass
class BrowserContext:
cookies: object
storage_entries: list[StorageEntry]
def load_browser_context(
browser,
domain_name,
storage_origins=None,
profile_dir=None,
):
if browser != "firefox":
raise ValueError(f"unsupported browser: {browser}")
profile = Path(profile_dir) if profile_dir else find_firefox_profile_dir()
cookies = load_firefox_cookies(domain_name, profile)
storage_entries = read_firefox_storage_entries(
profile,
origin_filters=storage_origins or [],
)
return BrowserContext(cookies=cookies, storage_entries=storage_entries)
def find_firefox_profile_dir():
profiles_ini = firefox_profiles_root() / "profiles.ini"
parser = configparser.RawConfigParser()
if not profiles_ini.exists():
raise FileNotFoundError(f"Firefox profiles.ini not found at {profiles_ini}")
parser.read(profiles_ini, encoding="utf-8")
profiles = []
for section in parser.sections():
if not section.startswith("Profile"):
continue
path_value = parser.get(section, "Path", fallback="")
if not path_value:
continue
is_relative = parser.getboolean(section, "IsRelative", fallback=True)
profile_path = (
profiles_ini.parent / path_value if is_relative else Path(path_value)
)
profiles.append(
(
parser.getboolean(section, "Default", fallback=False),
profile_path,
)
)
if not profiles:
raise FileNotFoundError("No Firefox profiles found in profiles.ini")
profiles.sort(key=lambda item: (not item[0], str(item[1])))
return profiles[0][1]
def firefox_profiles_root():
if os.name == "nt":
appdata = os.getenv("APPDATA", "").strip()
if not appdata:
raise FileNotFoundError("APPDATA is not set")
return Path(appdata) / "Mozilla" / "Firefox"
return Path.home() / ".mozilla" / "firefox"
def load_firefox_cookies(domain_name, profile_dir):
cookie_file = Path(profile_dir) / "cookies.sqlite"
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
def read_firefox_storage_entries(profile_dir, origin_filters):
profile_dir = Path(profile_dir)
entries = []
entries.extend(read_firefox_ls_entries(profile_dir, origin_filters))
entries.extend(read_firefox_webapps_entries(profile_dir, origin_filters))
deduped = []
seen = set()
for entry in entries:
key = (entry.origin, entry.key, entry.value, entry.source)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def storage_entries_for_origin(storage_entries, origin_filters):
return [
entry
for entry in storage_entries
if origin_matches(entry.origin, origin_filters)
]
def find_storage_value(storage_entries, origin_filters, key):
for entry in storage_entries_for_origin(storage_entries, origin_filters):
if entry.key == key:
return entry.value
return ""
def find_json_storage_value(storage_entries, origin_filters, key, field):
raw_value = find_storage_value(storage_entries, origin_filters, key)
if not raw_value:
return ""
try:
payload = json.loads(raw_value)
except json.JSONDecodeError:
return ""
value = payload.get(field, "")
if value is None:
return ""
return str(value)
def list_storage_keys(storage_entries, origin_filters):
return sorted(
{
entry.key
for entry in storage_entries_for_origin(storage_entries, origin_filters)
if entry.key
}
)
def read_firefox_ls_entries(profile_dir, origin_filters):
entries = []
storage_root = profile_dir / "storage" / "default"
if not storage_root.exists():
return entries
for ls_path in storage_root.glob("*/ls/data.sqlite"):
origin = decode_firefox_origin(ls_path.parents[1].name)
if not origin_matches(origin, origin_filters):
continue
for row in query_sqlite(ls_path, "SELECT key, value FROM data"):
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[0]),
value=stringify_sql_value(row[1]),
source=ls_path.as_posix(),
)
)
return entries
def read_firefox_webapps_entries(profile_dir, origin_filters):
webapps_path = profile_dir / "webappsstore.sqlite"
if not webapps_path.exists():
return []
entries = []
for row in query_sqlite(
webapps_path,
"SELECT originKey, key, value FROM webappsstore2",
):
origin = stringify_sql_value(row[0])
if not origin_matches(origin, origin_filters):
continue
entries.append(
StorageEntry(
origin=origin,
key=stringify_sql_value(row[1]),
value=stringify_sql_value(row[2]),
source=webapps_path.as_posix(),
)
)
return entries
def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path)
connection = None
cursor = None
try:
connection = sqlite3.connect(copied_path)
cursor = connection.cursor()
cursor.execute(query)
rows = cursor.fetchall()
return rows
except sqlite3.OperationalError:
return []
finally:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path):
import os, shutil, tempfile
fd, tmp = tempfile.mkstemp(suffix=".sqlite")
os.close(fd)
shutil.copyfile(path, tmp)
return Path(tmp)
def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0]
return origin.replace("+++", "://")
def origin_matches(origin, origin_filters):
if not origin_filters:
return True
normalized_origin = origin.lower()
return any(filter_value.lower() in normalized_origin for filter_value in origin_filters)
def stringify_sql_value(value):
if value is None:
return ""
if isinstance(value, bytes):
for encoding in ("utf-8", "utf-16-le", "utf-16"):
try:
return value.decode(encoding)
except UnicodeDecodeError:
continue
return value.decode("utf-8", errors="ignore")
return str(value)