Simplify browser session bootstrap
This commit is contained in:
@@ -1,47 +1,13 @@
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import browser_cookie3
|
||||
|
||||
|
||||
@dataclass
|
||||
class StorageEntry:
|
||||
origin: str
|
||||
key: str
|
||||
value: str
|
||||
source: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BrowserContext:
|
||||
cookies: object
|
||||
storage_entries: list[StorageEntry]
|
||||
|
||||
|
||||
def load_browser_context(
|
||||
browser,
|
||||
domain_name,
|
||||
storage_origins=None,
|
||||
profile_dir=None,
|
||||
):
|
||||
if browser != "firefox":
|
||||
raise ValueError(f"unsupported browser: {browser}")
|
||||
|
||||
profile = Path(profile_dir) if profile_dir else find_firefox_profile_dir()
|
||||
cookies = load_firefox_cookies(domain_name, profile)
|
||||
storage_entries = read_firefox_storage_entries(
|
||||
profile,
|
||||
origin_filters=storage_origins or [],
|
||||
)
|
||||
return BrowserContext(cookies=cookies, storage_entries=storage_entries)
|
||||
|
||||
|
||||
def find_firefox_profile_dir():
|
||||
profiles_ini = firefox_profiles_root() / "profiles.ini"
|
||||
parser = configparser.RawConfigParser()
|
||||
@@ -88,106 +54,37 @@ def load_firefox_cookies(domain_name, profile_dir):
|
||||
return browser_cookie3.firefox(cookie_file=str(cookie_file), domain_name=domain_name)
|
||||
|
||||
|
||||
def read_firefox_storage_entries(profile_dir, origin_filters):
|
||||
profile_dir = Path(profile_dir)
|
||||
entries = []
|
||||
entries.extend(read_firefox_ls_entries(profile_dir, origin_filters))
|
||||
entries.extend(read_firefox_webapps_entries(profile_dir, origin_filters))
|
||||
|
||||
deduped = []
|
||||
seen = set()
|
||||
for entry in entries:
|
||||
key = (entry.origin, entry.key, entry.value, entry.source)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
deduped.append(entry)
|
||||
return deduped
|
||||
|
||||
|
||||
def storage_entries_for_origin(storage_entries, origin_filters):
|
||||
return [
|
||||
entry
|
||||
for entry in storage_entries
|
||||
if origin_matches(entry.origin, origin_filters)
|
||||
]
|
||||
|
||||
|
||||
def find_storage_value(storage_entries, origin_filters, key):
|
||||
for entry in storage_entries_for_origin(storage_entries, origin_filters):
|
||||
if entry.key == key:
|
||||
return entry.value
|
||||
return ""
|
||||
|
||||
|
||||
def find_json_storage_value(storage_entries, origin_filters, key, field):
|
||||
raw_value = find_storage_value(storage_entries, origin_filters, key)
|
||||
if not raw_value:
|
||||
return ""
|
||||
try:
|
||||
payload = json.loads(raw_value)
|
||||
except json.JSONDecodeError:
|
||||
return ""
|
||||
value = payload.get(field, "")
|
||||
if value is None:
|
||||
return ""
|
||||
return str(value)
|
||||
|
||||
|
||||
def list_storage_keys(storage_entries, origin_filters):
|
||||
return sorted(
|
||||
{
|
||||
entry.key
|
||||
for entry in storage_entries_for_origin(storage_entries, origin_filters)
|
||||
if entry.key
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def read_firefox_ls_entries(profile_dir, origin_filters):
|
||||
entries = []
|
||||
def read_firefox_local_storage(profile_dir, origin_filter):
|
||||
storage_root = profile_dir / "storage" / "default"
|
||||
if not storage_root.exists():
|
||||
return entries
|
||||
return {}
|
||||
|
||||
for ls_path in storage_root.glob("*/ls/data.sqlite"):
|
||||
origin = decode_firefox_origin(ls_path.parents[1].name)
|
||||
if not origin_matches(origin, origin_filters):
|
||||
if origin_filter.lower() not in origin.lower():
|
||||
continue
|
||||
for row in query_sqlite(ls_path, "SELECT key, value FROM data"):
|
||||
entries.append(
|
||||
StorageEntry(
|
||||
origin=origin,
|
||||
key=stringify_sql_value(row[0]),
|
||||
value=stringify_sql_value(row[1]),
|
||||
source=ls_path.as_posix(),
|
||||
)
|
||||
)
|
||||
return entries
|
||||
return {
|
||||
stringify_sql_value(row[0]): stringify_sql_value(row[1])
|
||||
for row in query_sqlite(ls_path, "SELECT key, value FROM data")
|
||||
}
|
||||
return {}
|
||||
|
||||
|
||||
def read_firefox_webapps_entries(profile_dir, origin_filters):
|
||||
def read_firefox_webapps_store(profile_dir, origin_filter):
|
||||
webapps_path = profile_dir / "webappsstore.sqlite"
|
||||
if not webapps_path.exists():
|
||||
return []
|
||||
return {}
|
||||
|
||||
entries = []
|
||||
values = {}
|
||||
for row in query_sqlite(
|
||||
webapps_path,
|
||||
"SELECT originKey, key, value FROM webappsstore2",
|
||||
):
|
||||
origin = stringify_sql_value(row[0])
|
||||
if not origin_matches(origin, origin_filters):
|
||||
if origin_filter.lower() not in origin.lower():
|
||||
continue
|
||||
entries.append(
|
||||
StorageEntry(
|
||||
origin=origin,
|
||||
key=stringify_sql_value(row[1]),
|
||||
value=stringify_sql_value(row[2]),
|
||||
source=webapps_path.as_posix(),
|
||||
)
|
||||
)
|
||||
return entries
|
||||
values[stringify_sql_value(row[1])] = stringify_sql_value(row[2])
|
||||
return values
|
||||
|
||||
def query_sqlite(path, query):
|
||||
copied_path = copy_sqlite_to_temp(path)
|
||||
@@ -210,7 +107,6 @@ def query_sqlite(path, query):
|
||||
|
||||
|
||||
def copy_sqlite_to_temp(path):
|
||||
import os, shutil, tempfile
|
||||
fd, tmp = tempfile.mkstemp(suffix=".sqlite")
|
||||
os.close(fd)
|
||||
shutil.copyfile(path, tmp)
|
||||
@@ -220,14 +116,6 @@ def decode_firefox_origin(raw_origin):
|
||||
origin = raw_origin.split("^", 1)[0]
|
||||
return origin.replace("+++", "://")
|
||||
|
||||
|
||||
def origin_matches(origin, origin_filters):
|
||||
if not origin_filters:
|
||||
return True
|
||||
normalized_origin = origin.lower()
|
||||
return any(filter_value.lower() in normalized_origin for filter_value in origin_filters)
|
||||
|
||||
|
||||
def stringify_sql_value(value):
|
||||
if value is None:
|
||||
return ""
|
||||
|
||||
Reference in New Issue
Block a user