fixed sqlite copy permission error

This commit is contained in:
2026-03-16 16:18:50 -04:00
parent 4fd309251d
commit 5a331c9af4
3 changed files with 42 additions and 12 deletions

View File

@@ -149,25 +149,32 @@ def read_firefox_webapps_entries(profile_dir, origin_filters):
) )
return entries return entries
def query_sqlite(path, query): def query_sqlite(path, query):
copied_path = copy_sqlite_to_temp(path) copied_path = copy_sqlite_to_temp(path)
connection = None
cursor = None
try: try:
with sqlite3.connect(copied_path) as connection: connection = sqlite3.connect(copied_path)
return list(connection.execute(query)) cursor = connection.cursor()
cursor.execute(query)
rows = cursor.fetchall()
return rows
except sqlite3.OperationalError: except sqlite3.OperationalError:
return [] return []
finally: finally:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
copied_path.unlink(missing_ok=True) copied_path.unlink(missing_ok=True)
def copy_sqlite_to_temp(path): def copy_sqlite_to_temp(path):
source_path = Path(path) import os, shutil, tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=source_path.suffix) as handle: fd, tmp = tempfile.mkstemp(suffix=".sqlite")
temp_path = Path(handle.name) os.close(fd)
shutil.copy2(source_path, temp_path) shutil.copyfile(path, tmp)
return temp_path return Path(tmp)
def decode_firefox_origin(raw_origin): def decode_firefox_origin(raw_origin):
origin = raw_origin.split("^", 1)[0] origin = raw_origin.split("^", 1)[0]

View File

@@ -227,6 +227,25 @@ def build_headers(auth_headers):
def build_session(): def build_session():
retailer_session = load_costco_session() retailer_session = load_costco_session()
click.echo(
"session bootstrap: "
f"cookies={bool(retailer_session.cookies)}, "
f"authorization={'costco-x-authorization' in retailer_session.headers}, "
f"client_id={'costco-x-wcs-clientId' in retailer_session.headers}, "
f"client_identifier={'client-identifier' in retailer_session.headers}"
)
auth = retailer_session.headers.get("costco-x-authorization", "")
if auth:
click.echo(
f"auth prefix ok={auth.startswith('Bearer ')} len={len(auth)} token_prefix={auth[:24]}"
)
click.echo(
"header values: "
f"client_id={retailer_session.headers.get('costco-x-wcs-clientId', '')} "
f"client_identifier={retailer_session.headers.get('client-identifier', '')}"
)
session = requests.Session() session = requests.Session()
session.cookies.update(retailer_session.cookies) session.cookies.update(retailer_session.cookies)
session.headers.update(build_headers(retailer_session.headers)) session.headers.update(build_headers(retailer_session.headers))
@@ -247,7 +266,7 @@ def graphql_post(session, query, variables):
last_response = response last_response = response
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
click.echo(f"retry {attempt + 1}/3 status={response.status_code}") click.echo(f"retry {attempt + 1}/3 status={response.status_code} body={response.text[:500]}")
except Exception as exc: # pragma: no cover - network error path except Exception as exc: # pragma: no cover - network error path
click.echo(f"retry {attempt + 1}/3 error={exc}") click.echo(f"retry {attempt + 1}/3 error={exc}")
time.sleep(3) time.sleep(3)

View File

@@ -15,12 +15,16 @@ class BrowserSessionTests(unittest.TestCase):
ls_dir.mkdir(parents=True) ls_dir.mkdir(parents=True)
db_path = ls_dir / "data.sqlite" db_path = ls_dir / "data.sqlite"
with sqlite3.connect(db_path) as connection: connection = sqlite3.connect(db_path)
try:
connection.execute("CREATE TABLE data (key TEXT, value TEXT)") connection.execute("CREATE TABLE data (key TEXT, value TEXT)")
connection.execute( connection.execute(
"INSERT INTO data (key, value) VALUES (?, ?)", "INSERT INTO data (key, value) VALUES (?, ?)",
("session", '{"costco":{"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}}'), ("session", '{"costco":{"clientIdentifier":"481b1aec-aa3b-454b-b81b-48187e28f205"}}'),
) )
connection.commit()
finally:
connection.close()
entries = browser_session.read_firefox_storage_entries( entries = browser_session.read_firefox_storage_entries(
profile_dir, profile_dir,