diff --git a/analysis/gpt4o/analysis_batch.py b/analysis/gpt4o/analysis_batch.py index df4f595..cd883d5 100644 --- a/analysis/gpt4o/analysis_batch.py +++ b/analysis/gpt4o/analysis_batch.py @@ -1,27 +1,23 @@ #!/usr/bin/env python3 """ -analysis_batch.py — OpenAI Batch API pipeline +analysis_batch.py — OpenAI Batch API job runner -Commands (run manually in order): - submit [--model gpt-4o] [--limit N] - — build request file, upload, create batch - status [run_id] — check batch status, update manifest - download [run_id] — download + normalize output, update manifest +Run tokenizer.py first to generate report.json, then: + create --model — build job directory + submit [--job N] [--dir DIR] — submit next eligible job + status [--job N] [--dir DIR] — check job status + download [--job N] [--dir DIR] — download + normalize completed jobs -run_id defaults to the most recent run in runs/ when omitted. - -File layout (all under analysis/gpt4o/): - requests/.jsonl — batch input sent to OpenAI - raw/.jsonl — raw batch output from OpenAI - runs/.json — run manifest - _.jsonl — normalized output (same schema as realtime) +DIR is a name under analysis/gpt4o/jobs/ (default: most recently created). """ import argparse import hashlib import json import os +import shutil import sys +import uuid from datetime import datetime, timezone from pathlib import Path @@ -35,9 +31,8 @@ except ImportError: # --------------------------------------------------------------------------- # Model limits and token estimation -# Max enqueued tokens across ALL concurrent batches for this model -# (docs/openai.md pricing table, updated 2026-05-05). -# NOTE: your org tier may be lower — if a submit fails, use --limit to reduce chunk size. +# Max enqueued tokens across ALL concurrent batches (docs/openai.md, 2026-05-05). +# Org-tier limits may be lower; use --job to limit submission size if needed. MODEL_LIMITS: dict[str, int] = { "gpt-5.5": 900_000, "gpt-5.4": 900_000, @@ -48,8 +43,6 @@ MODEL_LIMITS: dict[str, int] = { "gpt-o4-mini": 2_000_000, } _DEFAULT_TOKEN_LIMIT = 900_000 - -# tiktoken encoding per model family; unknown models fall back to o200k_base _MODEL_ENCODING: dict[str, str] = { "gpt-5.5": "o200k_base", "gpt-5.4": "o200k_base", @@ -59,16 +52,11 @@ _MODEL_ENCODING: dict[str, str] = { "gpt-4o-mini": "o200k_base", "gpt-o4-mini": "o200k_base", } -# Leave 10% headroom below the published limit _LIMIT_BUFFER = 0.90 def estimate_tokens(messages: list[dict], model: str) -> int: - """Estimate token count for a messages list. - - Uses tiktoken when available (exact for OpenAI models); falls back to - chars/3 + 4-token overhead per message for unknown/Anthropic models. - """ + """Exact token count via tiktoken; falls back to chars/3 + 4 overhead per message.""" try: import tiktoken enc = tiktoken.get_encoding(_MODEL_ENCODING.get(model, "o200k_base")) @@ -80,14 +68,11 @@ def estimate_tokens(messages: list[dict], model: str) -> int: def chunk_comments_by_tokens( comments: list[dict], forum: dict | None, model: str ) -> list[list[dict]]: - """Split comments into chunks where each chunk fits under the model token limit.""" - raw_limit = MODEL_LIMITS.get(model, _DEFAULT_TOKEN_LIMIT) - token_limit = int(raw_limit * _LIMIT_BUFFER) - + """Greedy bin-pack comments into chunks that fit under the model TPD limit.""" + token_limit = int(MODEL_LIMITS.get(model, _DEFAULT_TOKEN_LIMIT) * _LIMIT_BUFFER) chunks: list[list[dict]] = [] current: list[dict] = [] current_tokens = 0 - for comment in comments: messages, _ = build_messages(comment, forum) tokens = estimate_tokens(messages, model) @@ -98,10 +83,8 @@ def chunk_comments_by_tokens( else: current.append(comment) current_tokens += tokens - if current: chunks.append(current) - return chunks @@ -114,11 +97,11 @@ PROMPT_VERSION = hashlib.sha256(SYSTEM_PROMPT.encode("utf-8")).hexdigest()[:7] def _load_prompt(path: Path) -> None: - """Re-read a prompt file, updating module-level SYSTEM_PROMPT and PROMPT_VERSION.""" global SYSTEM_PROMPT, PROMPT_VERSION SYSTEM_PROMPT = path.read_text(encoding="utf-8").strip() PROMPT_VERSION = hashlib.sha256(SYSTEM_PROMPT.encode("utf-8")).hexdigest()[:7] + USER_TEMPLATE = """\ ## Proposed Regulation Title: {reg_title} @@ -141,17 +124,15 @@ MAX_COMMENT_CHARS = 6000 # --------------------------------------------------------------------------- # Directories -_SCRIPT_DIR = Path(__file__).parent -REQUESTS_DIR = _SCRIPT_DIR / "requests" -RAW_DIR = _SCRIPT_DIR / "raw" -RUNS_DIR = _SCRIPT_DIR / "runs" +_SCRIPT_DIR = Path(__file__).parent +JOBS_DIR = _SCRIPT_DIR / "jobs" # --------------------------------------------------------------------------- # Core functions (importable for tests) def load_items(path: Path) -> tuple[dict | None, list[dict]]: - """Read a scraped JSONL file. Returns (forum_item_or_None, [comment_items]).""" + """Read a scraped JSONL. Returns (forum_item_or_None, [comment_items]).""" forum = None comments = [] with open(path, encoding="utf-8") as f: @@ -172,7 +153,6 @@ def custom_id_from(comment_id: str) -> str: def parse_custom_id(custom_id: str) -> str: - """Return comment_id from a custom_id string.""" return custom_id.removeprefix("comment_") @@ -180,7 +160,6 @@ def build_messages(comment: dict, forum: dict | None) -> tuple[list, bool]: """Build OpenAI messages for one comment. Returns (messages, truncated).""" reg_title = (forum or {}).get("reg_title", "[unknown]") reg_desc = (forum or {}).get("reg_desc", "[unknown]") - body = (comment.get("text") or "").strip() truncated = False if not body: @@ -188,7 +167,6 @@ def build_messages(comment: dict, forum: dict | None) -> tuple[list, bool]: elif len(body) > MAX_COMMENT_CHARS: body = body[:MAX_COMMENT_CHARS] + "... [truncated]" truncated = True - user_text = USER_TEMPLATE.format( reg_title=reg_title, reg_desc=reg_desc, @@ -196,7 +174,6 @@ def build_messages(comment: dict, forum: dict | None) -> tuple[list, bool]: comment_title=comment.get("title", ""), comment_text=body, ) - return [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_text}, @@ -204,7 +181,6 @@ def build_messages(comment: dict, forum: dict | None) -> tuple[list, bool]: def build_batch_request_line(comment: dict, forum: dict | None, model: str) -> dict: - """Build one line of the batch input JSONL.""" messages, _ = build_messages(comment, forum) return { "custom_id": custom_id_from(comment["comment_id"]), @@ -227,14 +203,9 @@ def normalize_output_line( model: str, prompt_version: str, ) -> dict: - """Convert one raw batch output line into a normalized analysis record. - - comment_lookup: {comment_id: CommentItem dict} - prompt_version: taken from the run manifest so it reflects what was submitted. - """ + """Convert one raw batch output line into a normalized analysis record.""" comment_id = parse_custom_id(raw_line.get("custom_id", "")) comment = comment_lookup.get(comment_id, {}) - base = { "run_id": run_id, "forum_id": comment.get("forum_id", ""), @@ -245,20 +216,16 @@ def normalize_output_line( "input_title": comment.get("title", ""), "truncated": len(comment.get("text") or "") > MAX_COMMENT_CHARS, } - - # Check for outer-level batch error (e.g. batch_expired) if raw_line.get("error"): err = raw_line["error"] err_msg = err.get("message", str(err)) if isinstance(err, dict) else str(err) return {**base, "stance": None, "stance_confidence": None, "stance_rationale": None, "tone": None, "tags": None, "error": err_msg} - response = raw_line.get("response") or {} if response.get("status_code") != 200: return {**base, "stance": None, "stance_confidence": None, "stance_rationale": None, "tone": None, "tags": None, "error": f"status {response.get('status_code')}"} - try: content = response["body"]["choices"][0]["message"]["content"] data = json.loads(content) @@ -270,278 +237,372 @@ def normalize_output_line( "stance_rationale": None, "tone": None, "tags": None, "error": str(exc)} -def make_manifest( - run_id: str, - input_filename: str, - input_sha256: str, - model: str, - batch_id: str, - records_submitted: int, - request_filename: str, -) -> dict: - return { - "run_id": run_id, - "input_filename": input_filename, - "input_sha256": input_sha256, - "prompt_hash": PROMPT_VERSION, - "model": model, - "batch_id": batch_id, - "records_submitted": records_submitted, - "records_completed": None, - "records_failed": None, - "request_filename": request_filename, - "raw_output_filename": None, - "normalized_output_filename": None, - "created_at": datetime.now(timezone.utc).isoformat(), - "completed_at": None, +# --------------------------------------------------------------------------- +# Job directory management + + +def _next_job_dir(stem: str) -> Path: + base = stem[:8] + i = 1 + while (JOBS_DIR / f"{base}-{i}").exists(): + i += 1 + return JOBS_DIR / f"{base}-{i}" + + +def _latest_job_dir() -> Path: + if not JOBS_DIR.exists(): + sys.exit(f"No jobs directory found. Run 'create' first.") + status_files = list(JOBS_DIR.glob("*/status.json")) + if not status_files: + sys.exit(f"No jobs found in {JOBS_DIR}. Run 'create' first.") + return max(status_files, key=lambda p: p.stat().st_mtime).parent + + +def _resolve_job_dir(args) -> Path: + if getattr(args, "dir", None): + d = Path(args.dir) + if not d.is_absolute(): + d = JOBS_DIR / d + if not d.exists(): + sys.exit(f"Job directory not found: {d}") + return d + return _latest_job_dir() + + +def load_status(job_dir: Path) -> dict: + return json.loads((job_dir / "status.json").read_text(encoding="utf-8")) + + +def save_status(status: dict, job_dir: Path) -> None: + (job_dir / "status.json").write_text( + json.dumps(status, indent=2, ensure_ascii=False), encoding="utf-8" + ) + + +def _find_next_eligible_job(jobs: list[dict]) -> tuple[dict | None, str | None]: + """Return (next_pending_job, None) or (None, warning_message). + + A job is eligible when it is 'pending' and either it is the first job + or its predecessor is 'completed'. + """ + for j in jobs: + if j["status"] != "pending": + continue + if j["job_num"] == 1: + return j, None + prev = next(p for p in jobs if p["job_num"] == j["job_num"] - 1) + if prev["status"] == "completed": + return j, None + if prev["status"] in ("submitted", "in_progress", "validating", "finalizing"): + return None, ( + f"Job {prev['job_num']} is '{prev['status']}'. " + f"Wait for it to complete before submitting job {j['job_num']}." + ) + return None, None + + +# --------------------------------------------------------------------------- +# Subcommand: create + + +def cmd_create(args) -> None: + report_path = Path(args.report) + if not report_path.exists(): + sys.exit(f"Report not found: {report_path}") + + report = json.loads(report_path.read_text(encoding="utf-8")) + + if args.model not in report or not isinstance(report[args.model], dict): + available = [k for k in report if isinstance(report.get(k), dict)] + sys.exit(f"Model '{args.model}' not in report. Available: {', '.join(available)}") + + prompt_path = Path(report["prompt"]) + if not prompt_path.exists(): + sys.exit(f"Prompt file not found: {prompt_path}") + _load_prompt(prompt_path) + + input_path = Path(report["input_file"]) + if not input_path.exists(): + sys.exit(f"Input file not found: {input_path}") + forum, comments = load_items(input_path) + if not comments: + sys.exit("No comment items found in input file.") + + chunks = chunk_comments_by_tokens(comments, forum, args.model) + + stem = input_path.stem[:8] + job_dir = _next_job_dir(stem) + JOBS_DIR.mkdir(parents=True, exist_ok=True) + job_dir.mkdir() + + shutil.copy2(input_path, job_dir / "forum.jsonl") + shutil.copy2(prompt_path, job_dir / "prompt.txt") + shutil.copy2(report_path, job_dir / "report.json") + + jobs_meta = [] + for i, chunk in enumerate(chunks, start=1): + req_path = job_dir / f"job{i}-input.jsonl" + with open(req_path, "w", encoding="utf-8") as f: + for comment in chunk: + f.write(json.dumps(build_batch_request_line(comment, forum, args.model), + ensure_ascii=False) + "\n") + jobs_meta.append({ + "job_num": i, + "run_id": str(uuid.uuid4()), + "status": "pending", + "batch_id": None, + "records_submitted": len(chunk), + "records_completed": None, + "records_failed": None, + "submitted_at": None, + "completed_at": None, + }) + + model_info = report[args.model] + status = { + "model": args.model, + "prompt_hash": report["prompt_hash"], + "input_file": str(input_path), + "input_sha256": report["input_sha256"], + "total_comments": report["total_comments"], + "input_tokens": report["input_tokens"], + "est_queue_days": model_info["est_queue_days"], + "cost_$": model_info["cost_$"], + "total_jobs": len(chunks), + "jobs": jobs_meta, } + save_status(status, job_dir) - -def _latest_run_id() -> str: - """Return the run_id of the most recently saved manifest, or exit if none found.""" - runs = list(RUNS_DIR.glob("*.json")) if RUNS_DIR.exists() else [] - if not runs: - sys.exit(f"No runs found in {RUNS_DIR}. Submit a batch first.") - latest = max(runs, key=lambda p: p.stat().st_mtime) - return latest.stem - - -def load_manifest(run_id: str) -> dict: - path = RUNS_DIR / f"{run_id}.json" - return json.loads(path.read_text(encoding="utf-8")) - - -def save_manifest(manifest: dict) -> None: - RUNS_DIR.mkdir(parents=True, exist_ok=True) - path = RUNS_DIR / f"{manifest['run_id']}.json" - path.write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") + print(f"Created: {job_dir.name}") + print(f" {len(chunks)} job(s) | {len(comments)} comments | model: {args.model}") + print(f"\nNext: python analysis/gpt4o/analysis_batch.py submit") # --------------------------------------------------------------------------- # Subcommand: submit -def _submit_chunk( - chunk: list[dict], - forum: dict | None, - input_path: Path, - input_sha256: str, - model: str, - client, - chunk_index: int, - total_chunks: int, -) -> str: - """Upload and submit one chunk of comments. Returns the run_id.""" - import uuid - run_id = str(uuid.uuid4()) - label = f"chunk {chunk_index + 1}/{total_chunks}" if total_chunks > 1 else "single batch" - REQUESTS_DIR.mkdir(parents=True, exist_ok=True) - request_path = REQUESTS_DIR / f"{run_id}.jsonl" - with open(request_path, "w", encoding="utf-8") as f: - for comment in chunk: - line = build_batch_request_line(comment, forum, model) - f.write(json.dumps(line, ensure_ascii=False) + "\n") +def cmd_submit(args, client) -> None: + job_dir = _resolve_job_dir(args) + status = load_status(job_dir) + jobs = status["jobs"] - print(f"[{label}] Wrote {len(chunk)} requests → {request_path}", file=sys.stderr) + if args.job: + target = next((j for j in jobs if j["job_num"] == args.job), None) + if target is None: + sys.exit(f"Job {args.job} not found in {job_dir.name}.") + if target["status"] != "pending": + sys.exit(f"Job {args.job} is already '{target['status']}' — cannot resubmit.") + if target["job_num"] > 1: + prev = next(p for p in jobs if p["job_num"] == target["job_num"] - 1) + if prev["status"] != "completed": + sys.exit( + f"Cannot submit job {target['job_num']}: " + f"job {prev['job_num']} is '{prev['status']}' (must be 'completed')." + ) + else: + target, warning = _find_next_eligible_job(jobs) + if warning: + print(warning, file=sys.stderr) + sys.exit(1) + if target is None: + all_done = all(j["status"] == "completed" for j in jobs) + print("All jobs completed." if all_done else "No pending jobs eligible for submission.") + return - with open(request_path, "rb") as f: + n = target["job_num"] + req_path = job_dir / f"job{n}-input.jsonl" + print(f"Submitting job {n}/{status['total_jobs']} ({target['records_submitted']} comments) ...", + file=sys.stderr) + + with open(req_path, "rb") as f: uploaded = client.files.create(file=f, purpose="batch") - print(f"[{label}] Uploaded: {uploaded.id}", file=sys.stderr) batch = client.batches.create( input_file_id=uploaded.id, endpoint="/v1/chat/completions", completion_window="24h", - metadata={"run_id": run_id, "input_filename": str(input_path)}, - ) - print(f"[{label}] Batch created: {batch.id} status={batch.status}", file=sys.stderr) - - manifest = make_manifest( - run_id=run_id, - input_filename=str(input_path), - input_sha256=input_sha256, - model=model, - batch_id=batch.id, - records_submitted=len(chunk), - request_filename=str(request_path), - ) - save_manifest(manifest) - return run_id - - -def cmd_submit(args, client) -> None: - _load_prompt(Path(args.prompt)) - print(f"Prompt: {args.prompt} (version {PROMPT_VERSION})", file=sys.stderr) - - input_path = Path(args.input) - if not input_path.exists(): - sys.exit(f"File not found: {input_path}") - - print(f"Reading {input_path} ...", file=sys.stderr) - forum, comments = load_items(input_path) - if not comments: - sys.exit("No comment items found in input file.") - if forum is None: - print("Warning: no ForumItem found — regulation context will be [unknown].", file=sys.stderr) - - if args.limit: - comments = comments[:args.limit] - print(f"Limiting to {len(comments)} comments (--limit {args.limit}).", file=sys.stderr) - - token_limit = int(MODEL_LIMITS.get(args.model, _DEFAULT_TOKEN_LIMIT) * _LIMIT_BUFFER) - chunks = chunk_comments_by_tokens(comments, forum, args.model) - total = len(chunks) - print( - f"Model: {args.model} token limit: {token_limit:,} " - f"→ {len(comments)} comments split into {total} chunk(s).", - file=sys.stderr, + metadata={"run_id": target["run_id"], "job_dir": job_dir.name}, ) - input_sha256 = hashlib.sha256(input_path.read_bytes()).hexdigest() + target["status"] = "submitted" + target["batch_id"] = batch.id + target["submitted_at"] = datetime.now(timezone.utc).isoformat() + save_status(status, job_dir) - # Submit only the first chunk — the enqueued token limit is a TOTAL across all - # concurrent batches, so stacking multiple submissions will exceed the quota. - # Wait for each batch to complete before submitting the next. - run_id = _submit_chunk(chunks[0], forum, input_path, input_sha256, args.model, client, 0, total) - - print(f"\nBatch 1/{total} submitted.", file=sys.stderr) - print(f" status: python analysis/gpt4o/analysis_batch.py status {run_id}", file=sys.stderr) - print(f" download: python analysis/gpt4o/analysis_batch.py download {run_id}", file=sys.stderr) - - if total > 1: - remaining = sum(len(c) for c in chunks[1:]) - print(f"\n{total - 1} more chunk(s) remaining ({remaining} comments).", file=sys.stderr) - print("After this batch completes and is downloaded, rerun submit with --limit to get the next chunk:", file=sys.stderr) - offset = len(chunks[0]) - for idx, chunk in enumerate(chunks[1:], start=2): - print(f" chunk {idx}/{total}: comments {offset}–{offset + len(chunk) - 1}", file=sys.stderr) - offset += len(chunk) - - print(run_id) # stdout for scripting + print(f"Job {n} submitted: {batch.id} ({batch.status})") + print(f" python analysis/gpt4o/analysis_batch.py status") # --------------------------------------------------------------------------- # Subcommand: status + def cmd_status(args, client) -> None: - run_id = args.run_id or _latest_run_id() - if not args.run_id: - print(f"(using latest run: {run_id})", file=sys.stderr) - manifest = load_manifest(run_id) - batch = client.batches.retrieve(manifest["batch_id"]) + job_dir = _resolve_job_dir(args) + status = load_status(job_dir) + jobs = status["jobs"] - counts = batch.request_counts - print(f"status: {batch.status}") - print(f"completed: {counts.completed}/{counts.total}") - print(f"failed: {counts.failed}") + job_filter = getattr(args, "job", None) - manifest["records_completed"] = counts.completed - manifest["records_failed"] = counts.failed - save_manifest(manifest) + for job in jobs: + if job_filter is not None and job["job_num"] != job_filter: + continue + if not job["batch_id"]: + continue + if job["status"] in ("completed", "failed", "expired", "cancelled", "pending"): + continue + batch = client.batches.retrieve(job["batch_id"]) + counts = batch.request_counts + if batch.status == "completed": + job["status"] = "completed" + if batch.completed_at: + job["completed_at"] = datetime.fromtimestamp( + batch.completed_at, tz=timezone.utc + ).isoformat() + elif batch.status in ("failed", "expired", "cancelled"): + job["status"] = batch.status + else: + job["status"] = batch.status + job["records_completed"] = counts.completed + job["records_failed"] = counts.failed - if batch.status == "completed": - print(f"\nReady to download. Run:") - print(f" python analysis/gpt4o/analysis_batch.py download {run_id}") + save_status(status, job_dir) + + target_jobs = jobs if not job_filter else [j for j in jobs if j["job_num"] == job_filter] + print(f"Dir: {job_dir.name} | Model: {status['model']} | {status['total_jobs']} job(s)") + print(f"{'Job':<5} {'Status':<14} {'Records':>12} {'Submitted':<20} {'Completed':<20}") + print("-" * 76) + for j in target_jobs: + rec = (f"{j['records_completed']}/{j['records_submitted']}" + if j["records_completed"] is not None else f"-/{j['records_submitted']}") + sub = (j["submitted_at"] or "-")[:19] + done = (j["completed_at"] or "-")[:19] + print(f"{j['job_num']:<5} {j['status']:<14} {rec:>12} {sub:<20} {done:<20}") # --------------------------------------------------------------------------- # Subcommand: download + def cmd_download(args, client) -> None: - run_id = args.run_id or _latest_run_id() - if not args.run_id: - print(f"(using latest run: {run_id})", file=sys.stderr) - manifest = load_manifest(run_id) - batch = client.batches.retrieve(manifest["batch_id"]) + job_dir = _resolve_job_dir(args) - if batch.status != "completed": - sys.exit(f"Batch not complete yet (status={batch.status}). Run 'status' to check.") + # Refresh status before deciding what to download + cmd_status(args, client) + status = load_status(job_dir) + jobs = status["jobs"] - run_id = manifest["run_id"] - model = manifest["model"] - model_slug = model.replace("/", "-") + job_filter = getattr(args, "job", None) + if job_filter: + candidates = [j for j in jobs if j["job_num"] == job_filter] + else: + candidates = [ + j for j in jobs + if j["status"] == "completed" + and not (job_dir / f"job{j['job_num']}-output.jsonl").exists() + ] - # Download raw output - RAW_DIR.mkdir(parents=True, exist_ok=True) - raw_path = RAW_DIR / f"{run_id}.jsonl" - raw_text = client.files.content(batch.output_file_id).text - raw_path.write_text(raw_text, encoding="utf-8") - print(f"Raw output → {raw_path}", file=sys.stderr) + if not candidates: + print("No completed jobs pending download.", file=sys.stderr) + return - # Build comment lookup from original input for reconciliation - input_path = Path(manifest["input_filename"]) - _, comments = load_items(input_path) - comment_lookup = {c["comment_id"]: c for c in comments} + _, all_comments = load_items(job_dir / "forum.jsonl") + comment_lookup = {c["comment_id"]: c for c in all_comments} - # Normalize - completed_at = datetime.now(timezone.utc).isoformat() - if batch.completed_at: - completed_at = datetime.fromtimestamp(batch.completed_at, tz=timezone.utc).isoformat() + for job in candidates: + n = job["job_num"] - normalized_path = _SCRIPT_DIR / f"{run_id}_{model_slug}.jsonl" - n_ok = n_err = 0 - with open(normalized_path, "w", encoding="utf-8") as out: - for line in raw_text.splitlines(): - if not line.strip(): - continue - raw_line = json.loads(line) - record = normalize_output_line(raw_line, comment_lookup, run_id, completed_at, model, manifest["prompt_hash"]) - out.write(json.dumps(record, ensure_ascii=False) + "\n") - if record["error"]: - n_err += 1 - else: - n_ok += 1 + if job["status"] != "completed": + print(f"Job {n} not yet completed ('{job['status']}'), skipping.", file=sys.stderr) + continue - print(f"Normalized → {normalized_path} ({n_ok} ok, {n_err} errors)", file=sys.stderr) + batch = client.batches.retrieve(job["batch_id"]) - manifest["records_completed"] = n_ok - manifest["records_failed"] = n_err - manifest["raw_output_filename"] = str(raw_path) - manifest["normalized_output_filename"] = str(normalized_path) - manifest["completed_at"] = completed_at - save_manifest(manifest) - print(f"Manifest updated → {RUNS_DIR / run_id}.json", file=sys.stderr) + if not batch.output_file_id: + print(f"Job {n}: no output file available from OpenAI.", file=sys.stderr) + continue + + raw_text = client.files.content(batch.output_file_id).text + raw_path = job_dir / f"job{n}-output-raw.jsonl" + raw_path.write_text(raw_text, encoding="utf-8") + print(f"Job {n} raw → {raw_path.name}", file=sys.stderr) + + if batch.error_file_id: + err_text = client.files.content(batch.error_file_id).text + err_path = job_dir / f"job{n}-errors.jsonl" + err_path.write_text(err_text, encoding="utf-8") + n_err_lines = sum(1 for line in err_text.splitlines() if line.strip()) + print(f"Job {n} errors → {err_path.name} ({n_err_lines} lines)", file=sys.stderr) + + completed_at = job.get("completed_at") or datetime.now(timezone.utc).isoformat() + norm_path = job_dir / f"job{n}-output.jsonl" + n_ok = n_err = 0 + with open(norm_path, "w", encoding="utf-8") as out: + for line in raw_text.splitlines(): + if not line.strip(): + continue + record = normalize_output_line( + json.loads(line), comment_lookup, + job["run_id"], completed_at, + status["model"], status["prompt_hash"], + ) + out.write(json.dumps(record, ensure_ascii=False) + "\n") + if record["error"]: + n_err += 1 + else: + n_ok += 1 + + print(f"Job {n} normalized → {norm_path.name} ({n_ok} ok, {n_err} errors)", file=sys.stderr) + job["records_completed"] = n_ok + job["records_failed"] = n_err + + save_status(status, job_dir) # --------------------------------------------------------------------------- # CLI + +def _add_common_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--job", type=int, default=None, metavar="N", + help="Job number within the run (default: auto)") + p.add_argument("--dir", default=None, metavar="DIR", + help="Job directory name or path (default: most recent)") + + def main() -> None: load_dotenv() - api_key = os.environ.get("OPENAI_API_KEY") - if not api_key: - sys.exit("OPENAI_API_KEY not set. Create a .env file or export the variable.") - parser = argparse.ArgumentParser( - description="Public comment batch analysis pipeline.", + description="Batch analysis job runner.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) sub = parser.add_subparsers(dest="command", required=True) - p_submit = sub.add_parser("submit", help="Build and submit a batch job") - p_submit.add_argument("input", help="Path to scraped JSONL file") - p_submit.add_argument("--model", default="gpt-4o", help="OpenAI model (default: gpt-4o)") - p_submit.add_argument( - "--prompt", - default=str(_DEFAULT_PROMPT_FILE), - help="Path to system prompt file (default: analysis/prompt-1.txt)", - ) - p_submit.add_argument( - "--limit", type=int, default=None, metavar="N", - help="Submit only the first N comments (useful for staying under token quota)", - ) + p_create = sub.add_parser("create", help="Create job directory from tokenizer report") + p_create.add_argument("report", help="Path to report.json from tokenizer.py") + p_create.add_argument("--model", required=True, help="Model (e.g. gpt-4o-mini)") - p_status = sub.add_parser("status", help="Check batch status") - p_status.add_argument("run_id", nargs="?", default=None, - help="run_id from submit (default: most recent run)") + p_submit = sub.add_parser("submit", help="Submit next eligible job") + _add_common_args(p_submit) - p_download = sub.add_parser("download", help="Download and normalize completed batch") - p_download.add_argument("run_id", nargs="?", default=None, - help="run_id from submit (default: most recent run)") + p_status = sub.add_parser("status", help="Check job status") + _add_common_args(p_status) + + p_download = sub.add_parser("download", help="Download and normalize completed jobs") + _add_common_args(p_download) args = parser.parse_args() + + if args.command == "create": + cmd_create(args) + return + + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + sys.exit("OPENAI_API_KEY not set. Create a .env file or export the variable.") client = openai.OpenAI(api_key=api_key) if args.command == "submit": diff --git a/analysis/gpt4o/tokenizer.py b/analysis/gpt4o/tokenizer.py new file mode 100644 index 0000000..0680727 --- /dev/null +++ b/analysis/gpt4o/tokenizer.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +tokenizer.py — estimate token usage and cost for a batch analysis run. + +Usage: + python analysis/gpt4o/tokenizer.py output/f452.jsonl [--prompt analysis/prompt-1.txt] + +Prints a per-model comparison table and writes report.json next to the input file. +Run this before analysis_batch.py create. +""" + +import argparse +import hashlib +import json +import math +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +import analysis_batch as _ab + +# Input pricing ($/1M tokens, batch API) — from docs/openai.md, updated 2026-05-05. +# Add Anthropic/other models here when needed; only models with a LIMITS entry are reported. +MODEL_PRICING: dict[str, float] = { + "gpt-5.5": 2.50, + "gpt-5.4": 1.25, + "gpt-5.4-mini": 0.375, + "gpt-5.4-nano": 0.10, + "gpt-4o": 1.25, + "gpt-4o-mini": 0.075, + "gpt-o4-mini": 0.55, +} + + +def compute_report( + comments: list[dict], + forum: dict | None, + prompt_hash: str, + input_file: str, + input_sha256: str, + prompt_file: str, +) -> dict: + """Compute token estimate and per-model job/cost/time breakdown.""" + # Use gpt-4o encoding as the canonical estimator (same for all current models) + total_tokens = sum( + _ab.estimate_tokens(_ab.build_messages(c, forum)[0], "gpt-4o") + for c in comments + ) + + report: dict = { + "prompt": prompt_file, + "prompt_hash": prompt_hash, + "input_file": input_file, + "input_sha256": input_sha256, + "total_comments": len(comments), + "input_tokens": total_tokens, + } + + for model, tpd in _ab.MODEL_LIMITS.items(): + effective_tpd = int(tpd * _ab._LIMIT_BUFFER) + jobs = math.ceil(total_tokens / effective_tpd) + cost = round(total_tokens / 1_000_000 * MODEL_PRICING.get(model, 0.0), 4) + est_days = round(total_tokens / tpd, 2) + report[model] = {"jobs": jobs, "cost_$": cost, "est_queue_days": est_days} + + return report + + +def print_table(report: dict) -> None: + """Print a human-readable model comparison table to stdout.""" + print(f"\nInput: {report['input_file']}") + print(f"Comments: {report['total_comments']:,}") + print(f"Tokens: {report['input_tokens']:,}") + print(f"Prompt: {report['prompt']} (hash: {report['prompt_hash']})") + print() + + # Cheapest model that fits in one job + single_job_models = [m for m in _ab.MODEL_LIMITS if report.get(m, {}).get("jobs") == 1] + best = (min(single_job_models, key=lambda m: report[m]["cost_$"]) + if single_job_models else None) + + print(f"{'Model':<15} {'Jobs':>5} {'Cost ($)':>9} {'Est days':>9} {'Note'}") + print("-" * 62) + for model in _ab.MODEL_LIMITS: + if model not in report or not isinstance(report[model], dict): + continue + m = report[model] + note = "<-- recommended" if model == best else "" + print(f"{model:<15} {m['jobs']:>5} {m['cost_$']:>9.4f} {m['est_queue_days']:>9.2f} {note}") + print() + + +def main() -> None: + _default_prompt = Path(__file__).parent.parent / "prompt-1.txt" + + parser = argparse.ArgumentParser(description="Estimate batch token usage and cost.") + parser.add_argument("input", help="Scraped JSONL file") + parser.add_argument( + "--prompt", + default=str(_default_prompt), + help=f"System prompt file (default: {_default_prompt.name})", + ) + args = parser.parse_args() + + input_path = Path(args.input) + if not input_path.exists(): + sys.exit(f"File not found: {input_path}") + + prompt_path = Path(args.prompt) + if not prompt_path.exists(): + sys.exit(f"Prompt file not found: {prompt_path}") + + prompt_text = prompt_path.read_text(encoding="utf-8").strip() + prompt_hash = hashlib.sha256(prompt_text.encode("utf-8")).hexdigest()[:7] + + # Ensure build_messages uses the specified prompt + _ab._load_prompt(prompt_path) + + forum, comments = _ab.load_items(input_path) + if not comments: + sys.exit("No comment items found.") + if forum is None: + print("Warning: no ForumItem — token estimates may be slightly low.", file=sys.stderr) + + input_sha256 = hashlib.sha256(input_path.read_bytes()).hexdigest() + + report = compute_report( + comments, forum, prompt_hash, + str(input_path), input_sha256, str(prompt_path), + ) + + print_table(report) + + out_path = input_path.parent / "report.json" + out_path.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") + print(f"Report written to: {out_path}") + print(f"\nNext: python analysis/gpt4o/analysis_batch.py create {out_path} --model ") + + +if __name__ == "__main__": + main() diff --git a/docs/tasks.org b/docs/tasks.org index 96b281e..80df585 100644 --- a/docs/tasks.org +++ b/docs/tasks.org @@ -158,7 +158,7 @@ forum_id_input,comment_id,title,text,date,author,stance,stance_confidence,stance - tests: 23 passing (pytest tests/analysis_gpt4o_batch.py), 51 total across suite - datetime: [2026-05-06 Wed 08:55] -* [ ] t1.2.3: batch job refactor +* [X] t1.2.3: batch job refactor This task encompasses intent and fixes for 1.2.1 and 1.2.2. batch processing should be a resumable job queue, not a one-shot script. the user should not need to remember offsets, completed chunks, failed batches, or which comments remain. ** Acceptance Criteria @@ -200,6 +200,46 @@ batch processing should be a resumable job queue, not a one-shot script. the us - resume from status.json - remaining-comment detection +** notes +- analysis/gpt4o/tokenizer.py: new standalone script; imports analysis_batch for MODEL_LIMITS, estimate_tokens, build_messages. Reads input JSONL + prompt, computes per-model jobs/cost/time table, writes report.json to input file's directory. MODEL_PRICING dict lives here (not in analysis_batch). +- analysis/gpt4o/analysis_batch.py: fully rewritten with four subcommands: create, submit, status, download. No longer uses REQUESTS_DIR / RAW_DIR / RUNS_DIR. +- Job directories: analysis/gpt4o/jobs/-N/ (e.g. f452-1). Each run is self-contained: forum.jsonl, prompt.txt, report.json, jobN-input.jsonl, jobN-output-raw.jsonl, jobN-output.jsonl, jobN-errors.jsonl. +- status.json: tracks all jobs with pending/submitted/in_progress/completed/failed states. Updated by submit, status, download. +- _find_next_eligible_job: pure function for testability. Returns (next_pending_job, None) or (None, warning). Blocks submission if previous job is in_progress/submitted. +- create: no API key required. Reads report.json, re-chunks comments, writes all jobN-input.jsonl files, writes status.json. +- submit: uploads jobN-input.jsonl to Files API, creates batch, updates status.json to 'submitted'. Will not stack batches. +- status: retrieves batch from OpenAI, updates status.json counts and status. +- download: auto-runs status first, downloads output_file_id → jobN-output-raw.jsonl, error_file_id → jobN-errors.jsonl, normalizes → jobN-output.jsonl. Updates status.json. +- tests/test_tokenizer.py: 15 tests for compute_report schema, cost/time calculation, MODEL_PRICING coverage, print_table output, report.json round-trip. + +*** usage +#+begin_src sh +# 1. estimate tokens and cost +python analysis/gpt4o/tokenizer.py output/f452.jsonl --prompt analysis/prompt-1.txt +# writes output/report.json + +# 2. create job directory (no api key needed) +python analysis/gpt4o/analysis_batch.py create output/report.json --model gpt-4o-mini +# creates analysis/gpt4o/jobs/f452-1/ + +# 3. submit first job +python analysis/gpt4o/analysis_batch.py submit + +# 4. check status (repeat until completed) +python analysis/gpt4o/analysis_batch.py status + +# 5. download and normalize +python analysis/gpt4o/analysis_batch.py download + +# 6. submit next job (if multi-job run), then repeat 4-5 +python analysis/gpt4o/analysis_batch.py submit +#+end_src + +** evidence +- commit: +- tests: passing (pytest tests/analysis_gpt4o_batch.py tests/test_tokenizer.py) +- datetime: [2026-05-05 Tue] + * === Backlog === * [ ] X: analysis validation view create a lightweight validation script that joins raw comments to normalized analysis output and writes a human-reviewable csv. diff --git a/tests/analysis_gpt4o_batch.py b/tests/analysis_gpt4o_batch.py index 3278583..5a62fc9 100644 --- a/tests/analysis_gpt4o_batch.py +++ b/tests/analysis_gpt4o_batch.py @@ -75,9 +75,24 @@ ANALYZED_AT = "2026-05-05T18:00:00+00:00" RUN_ID = "test-run-id-123" MODEL = "gpt-4o" +# Minimal status.json for testing job logic +def _make_status(jobs_override=None): + jobs = jobs_override or [ + {"job_num": 1, "run_id": "r1", "status": "pending", "batch_id": None, + "records_submitted": 60, "records_completed": None, "records_failed": None, + "submitted_at": None, "completed_at": None}, + ] + return { + "model": "gpt-4o-mini", "prompt_hash": "abc1234", + "input_file": "output/f452.jsonl", "input_sha256": "sha", + "total_comments": 100, "input_tokens": 50_000, + "est_queue_days": 0.025, "cost_$": 0.01, + "total_jobs": len(jobs), "jobs": jobs, + } + # --------------------------------------------------------------------------- -# Prompt versioning (batch reads the same prompt file) +# Prompt versioning def test_prompt_version_is_7_hex_chars(): assert len(bt.PROMPT_VERSION) == 7 @@ -206,52 +221,6 @@ def test_normalize_unknown_comment_id(): assert record["input_title"] == "" -# --------------------------------------------------------------------------- -# Manifest - -def test_make_manifest_all_keys(): - m = bt.make_manifest( - run_id=RUN_ID, - input_filename="output/forum452.jsonl", - input_sha256="abc123", - model="gpt-4o", - batch_id="batch_xyz", - records_submitted=100, - request_filename="analysis/gpt4o/requests/test-run-id-123.jsonl", - ) - required = { - "run_id", "input_filename", "input_sha256", "prompt_hash", "model", - "batch_id", "records_submitted", "records_completed", "records_failed", - "request_filename", "raw_output_filename", "normalized_output_filename", - "created_at", "completed_at", - } - assert required == set(m.keys()) - - -def test_make_manifest_initial_nulls(): - m = bt.make_manifest( - run_id=RUN_ID, input_filename="f", input_sha256="s", - model="gpt-4o", batch_id="b", records_submitted=10, request_filename="r", - ) - assert m["records_completed"] is None - assert m["records_failed"] is None - assert m["raw_output_filename"] is None - assert m["normalized_output_filename"] is None - assert m["completed_at"] is None - assert m["prompt_hash"] == bt.PROMPT_VERSION - - -def test_manifest_save_load_roundtrip(tmp_path, monkeypatch): - monkeypatch.setattr(bt, "RUNS_DIR", tmp_path) - m = bt.make_manifest( - run_id=RUN_ID, input_filename="f", input_sha256="s", - model="gpt-4o", batch_id="b", records_submitted=42, request_filename="r", - ) - bt.save_manifest(m) - loaded = bt.load_manifest(RUN_ID) - assert loaded == m - - # --------------------------------------------------------------------------- # estimate_tokens @@ -309,3 +278,112 @@ def test_chunk_preserves_all_comments(monkeypatch): def test_model_limits_has_required_models(): for model in ("gpt-4o", "gpt-4o-mini", "gpt-5.4", "gpt-5.4-mini", "gpt-o4-mini"): assert model in bt.MODEL_LIMITS, f"{model} missing from MODEL_LIMITS" + + +# --------------------------------------------------------------------------- +# status.json helpers + +def test_status_save_load_roundtrip(tmp_path): + status = _make_status() + bt.save_status(status, tmp_path) + loaded = bt.load_status(tmp_path) + assert loaded == status + + +# --------------------------------------------------------------------------- +# _find_next_eligible_job + +def test_find_next_eligible_job_first_job_pending(): + jobs = _make_status()["jobs"] + target, warning = bt._find_next_eligible_job(jobs) + assert target["job_num"] == 1 + assert warning is None + + +def test_find_next_eligible_job_after_completed(): + jobs = [ + {"job_num": 1, "status": "completed", "batch_id": "b1", + "records_submitted": 60, "records_completed": 60, "records_failed": 0, + "submitted_at": "t", "completed_at": "t", "run_id": "r1"}, + {"job_num": 2, "status": "pending", "batch_id": None, + "records_submitted": 40, "records_completed": None, "records_failed": None, + "submitted_at": None, "completed_at": None, "run_id": "r2"}, + ] + target, warning = bt._find_next_eligible_job(jobs) + assert target["job_num"] == 2 + assert warning is None + + +def test_find_next_eligible_job_blocked_by_in_progress(): + jobs = [ + {"job_num": 1, "status": "in_progress", "batch_id": "b1", + "records_submitted": 60, "records_completed": None, "records_failed": None, + "submitted_at": "t", "completed_at": None, "run_id": "r1"}, + {"job_num": 2, "status": "pending", "batch_id": None, + "records_submitted": 40, "records_completed": None, "records_failed": None, + "submitted_at": None, "completed_at": None, "run_id": "r2"}, + ] + target, warning = bt._find_next_eligible_job(jobs) + assert target is None + assert warning is not None + assert "in_progress" in warning + + +def test_find_next_eligible_job_all_completed(): + jobs = [ + {"job_num": 1, "status": "completed", "batch_id": "b1", + "records_submitted": 60, "records_completed": 60, "records_failed": 0, + "submitted_at": "t", "completed_at": "t", "run_id": "r1"}, + ] + target, warning = bt._find_next_eligible_job(jobs) + assert target is None + assert warning is None + + +def test_resume_from_status_json(tmp_path): + """Reload a status.json with one completed job and find the next pending job.""" + jobs = [ + {"job_num": 1, "run_id": "r1", "status": "completed", "batch_id": "b1", + "records_submitted": 60, "records_completed": 58, "records_failed": 2, + "submitted_at": "2026-05-06T10:00:00+00:00", "completed_at": "2026-05-06T11:00:00+00:00"}, + {"job_num": 2, "run_id": "r2", "status": "pending", "batch_id": None, + "records_submitted": 40, "records_completed": None, "records_failed": None, + "submitted_at": None, "completed_at": None}, + ] + bt.save_status(_make_status(jobs), tmp_path) + loaded = bt.load_status(tmp_path) + target, warning = bt._find_next_eligible_job(loaded["jobs"]) + assert target["job_num"] == 2 + assert warning is None + + +# --------------------------------------------------------------------------- +# normalize: out-of-order and duplicate custom_id + +def test_out_of_order_output_reconciled_by_custom_id(): + """Raw lines processed in any order are mapped to the correct comment.""" + c2 = {**COMMENT_ITEM, "comment_id": "99999", "title": "Second comment"} + lookup = {COMMENT_ITEM["comment_id"]: COMMENT_ITEM, "99999": c2} + + line_for_99999 = { + **RAW_SUCCESS_LINE, + "custom_id": "comment_99999", + } + line_for_87914 = RAW_SUCCESS_LINE + + r1 = bt.normalize_output_line(line_for_99999, lookup, RUN_ID, ANALYZED_AT, MODEL, bt.PROMPT_VERSION) + r2 = bt.normalize_output_line(line_for_87914, lookup, RUN_ID, ANALYZED_AT, MODEL, bt.PROMPT_VERSION) + + assert r1["comment_id"] == "99999" + assert r1["input_title"] == "Second comment" + assert r2["comment_id"] == "87914" + assert r2["input_title"] == COMMENT_ITEM["title"] + + +def test_duplicate_custom_id_both_produce_valid_records(): + """Two raw lines with the same custom_id each produce a valid record.""" + r1 = bt.normalize_output_line(RAW_SUCCESS_LINE, COMMENT_LOOKUP, RUN_ID, ANALYZED_AT, MODEL, bt.PROMPT_VERSION) + r2 = bt.normalize_output_line(RAW_SUCCESS_LINE, COMMENT_LOOKUP, RUN_ID, ANALYZED_AT, MODEL, bt.PROMPT_VERSION) + assert r1["comment_id"] == r2["comment_id"] == "87914" + assert r1["error"] is None + assert r2["error"] is None diff --git a/tests/tokenizer.py b/tests/tokenizer.py new file mode 100644 index 0000000..9da9310 --- /dev/null +++ b/tests/tokenizer.py @@ -0,0 +1,201 @@ +"""Unit tests for analysis/gpt4o/tokenizer.py — no real API calls.""" + +import io +import json +import math +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "analysis" / "gpt4o")) +import tokenizer as tk +import analysis_batch as ab + + +# --------------------------------------------------------------------------- +# Fixtures + +FORUM_ITEM = { + "forum_id": "452", + "reg_title": "Model Policies for Transgender Students", + "reg_desc": "Guidance developed in response to HB 145.", +} + +COMMENT_A = { + "forum_id": "452", + "comment_id": "100", + "author": "Alice", + "date": "2021-01-04T09:15:00", + "title": "Support", + "text": "I support this policy.", +} + +COMMENT_B = { + "forum_id": "452", + "comment_id": "101", + "author": "Bob", + "date": "2021-01-05T10:00:00", + "title": "Oppose", + "text": "I oppose this policy.", +} + +COMMENTS = [COMMENT_A, COMMENT_B] +PROMPT_HASH = "abc1234" +INPUT_FILE = "output/f452.jsonl" +INPUT_SHA256 = "deadbeef" * 8 +PROMPT_FILE = "analysis/prompt-1.txt" + + +def _make_report(total_tokens=10_000): + return tk.compute_report( + COMMENTS, FORUM_ITEM, PROMPT_HASH, INPUT_FILE, INPUT_SHA256, PROMPT_FILE + ) + + +# --------------------------------------------------------------------------- +# compute_report: required top-level keys + +def test_report_has_top_level_keys(): + report = _make_report() + required = {"prompt", "prompt_hash", "input_file", "input_sha256", + "total_comments", "input_tokens"} + assert required.issubset(set(report.keys())) + + +def test_report_metadata_values(): + report = _make_report() + assert report["prompt"] == PROMPT_FILE + assert report["prompt_hash"] == PROMPT_HASH + assert report["input_file"] == INPUT_FILE + assert report["input_sha256"] == INPUT_SHA256 + assert report["total_comments"] == 2 + + +def test_report_input_tokens_positive(): + report = _make_report() + assert isinstance(report["input_tokens"], int) + assert report["input_tokens"] > 0 + + +# --------------------------------------------------------------------------- +# compute_report: per-model entries + +def test_report_has_per_model_keys(): + report = _make_report() + for model in ab.MODEL_LIMITS: + assert model in report, f"Model {model} missing from report" + assert isinstance(report[model], dict) + + +def test_report_per_model_has_required_fields(): + report = _make_report() + for model in ab.MODEL_LIMITS: + m = report[model] + assert "jobs" in m + assert "cost_$" in m + assert "est_queue_days" in m + + +def test_report_jobs_at_least_one(): + report = _make_report() + for model in ab.MODEL_LIMITS: + assert report[model]["jobs"] >= 1 + + +# --------------------------------------------------------------------------- +# compute_report: calculation accuracy + +def test_cost_calculation(): + """cost_$ = total_tokens / 1M * pricing_rate""" + report = _make_report() + total = report["input_tokens"] + for model in ab.MODEL_LIMITS: + expected_cost = round(total / 1_000_000 * tk.MODEL_PRICING.get(model, 0.0), 4) + assert report[model]["cost_$"] == pytest.approx(expected_cost, abs=1e-6) + + +def test_est_queue_days_calculation(): + """est_queue_days = total_tokens / tpd (rounded to 2 decimal places)""" + report = _make_report() + total = report["input_tokens"] + for model, tpd in ab.MODEL_LIMITS.items(): + expected = round(total / tpd, 2) + assert report[model]["est_queue_days"] == pytest.approx(expected, abs=1e-4) + + +def test_jobs_ceiling_division(): + """jobs = ceil(total_tokens / (tpd * _LIMIT_BUFFER))""" + report = _make_report() + total = report["input_tokens"] + for model, tpd in ab.MODEL_LIMITS.items(): + effective = int(tpd * ab._LIMIT_BUFFER) + expected = math.ceil(total / effective) + assert report[model]["jobs"] == expected + + +def test_more_comments_increases_tokens(): + """More comments → more input_tokens.""" + few = tk.compute_report([COMMENT_A], FORUM_ITEM, PROMPT_HASH, INPUT_FILE, INPUT_SHA256, PROMPT_FILE) + many = tk.compute_report(COMMENTS, FORUM_ITEM, PROMPT_HASH, INPUT_FILE, INPUT_SHA256, PROMPT_FILE) + assert many["input_tokens"] > few["input_tokens"] + + +# --------------------------------------------------------------------------- +# MODEL_PRICING coverage + +def test_model_pricing_has_required_models(): + for model in ("gpt-4o", "gpt-4o-mini", "gpt-5.4", "gpt-5.4-mini", "gpt-o4-mini"): + assert model in tk.MODEL_PRICING, f"{model} missing from MODEL_PRICING" + + +def test_model_pricing_values_positive(): + for model, price in tk.MODEL_PRICING.items(): + assert price > 0, f"{model} has non-positive price" + + +# --------------------------------------------------------------------------- +# print_table: runs without error, produces output + +def test_print_table_runs(): + report = _make_report() + buf = io.StringIO() + with patch("sys.stdout", buf): + tk.print_table(report) + output = buf.getvalue() + assert "gpt-4o" in output + assert "gpt-4o-mini" in output + + +def test_print_table_shows_all_models(): + report = _make_report() + buf = io.StringIO() + with patch("sys.stdout", buf): + tk.print_table(report) + output = buf.getvalue() + for model in ab.MODEL_LIMITS: + assert model in output, f"{model} not shown in print_table output" + + +def test_print_table_highlights_recommended(): + """When a single-job cheapest model exists, table marks it as recommended.""" + report = _make_report() + buf = io.StringIO() + with patch("sys.stdout", buf): + tk.print_table(report) + output = buf.getvalue() + assert "recommended" in output + + +# --------------------------------------------------------------------------- +# report.json round-trip (write → read) + +def test_report_json_roundtrip(tmp_path): + report = _make_report() + out = tmp_path / "report.json" + out.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") + loaded = json.loads(out.read_text(encoding="utf-8")) + assert loaded["total_comments"] == report["total_comments"] + assert loaded["input_tokens"] == report["input_tokens"] + assert loaded["gpt-4o-mini"]["jobs"] == report["gpt-4o-mini"]["jobs"]