commit 375564a74735195015b853fe4ec2af98ff6e4fa0
parent 450388ee71cfe1d95d028029358c8007afb36ba5
Author: Brian Graham <brian@buildingbetterteams.de>
Date: Sat, 11 Apr 2026 07:45:46 +0200
Add run-scan-v5-haiku.py: pure Haiku output, no Opus merge
v4-haiku script merged Opus answers at write time, contaminating
scan-v4.json with v2 Opus overwrites. v5 script writes raw
Haiku/Sonnet output to scan-v5.json so per-question Haiku-Opus
comparisons remain possible for calibration analysis.
The build pipeline will handle the Opus/Haiku merge at read time,
preferring Opus where available but keeping the raw v5 data around.
Usage: python3 scripts/run-scan-v5-haiku.py --parallel 8
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Diffstat:
1 file changed, 367 insertions(+), 0 deletions(-)
diff --git a/scripts/run-scan-v5-haiku.py b/scripts/run-scan-v5-haiku.py
@@ -0,0 +1,367 @@
+#!/usr/bin/env python3
+"""
+V5 Haiku scan: fast coverage pass for all papers. PURE Haiku output — no merge.
+
+For each paper with paper.txt and paper_type.json:
+1. Read paper text + paper_type
+2. Run Haiku to answer shared core + type-specific questions
+3. Write scan-v5.json with raw Haiku/Sonnet answers (no Opus merge).
+
+The build pipeline handles Opus/Haiku merging at read time. This keeps v5
+files pure for calibration analysis (Haiku vs Opus per question).
+
+Usage:
+ python3 scripts/run-scan-v5-haiku.py # All unscanned
+ python3 scripts/run-scan-v5-haiku.py --limit 10 # First N
+ python3 scripts/run-scan-v5-haiku.py --parallel 8 # Concurrent
+ python3 scripts/run-scan-v5-haiku.py --id metr-rct-2025 # Specific paper
+ python3 scripts/run-scan-v5-haiku.py --force # Re-scan all
+"""
+
+import json
+import subprocess
+import sys
+import urllib.parse
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parent.parent
+PAPERS_DIR = ROOT / "papers"
+SCHEMA_PATH = ROOT / "schema" / "scan-v4.schema.json"
+
+# Load schema for question descriptions
+with open(SCHEMA_PATH) as f:
+ SCHEMA = json.load(f)
+
+# ── Build prompt from schema ──────────────────────────────────────────
+
+def build_questions_text(category_obj):
+ """Extract question descriptions from a schema category object."""
+ lines = []
+ props = category_obj.get("properties", {})
+ for qname, qdef in props.items():
+ desc = qdef.get("description", "")
+ if not desc and "$ref" in qdef:
+ desc = qdef.get("description", qname)
+ lines.append(f" - **{qname}**: {desc}")
+ return "\n".join(lines)
+
+
+def build_prompt(paper_type, paper_text, paper_id, registry_entry, hn_data):
+ """Build the v4 Haiku scan prompt."""
+ core_cats = SCHEMA["properties"]["checklist"]["properties"]
+ type_mod = SCHEMA["properties"]["type_checklist"]["properties"].get(paper_type, {})
+ type_cats = type_mod.get("properties", {})
+
+ core_section = ""
+ for cat_name, cat_def in core_cats.items():
+ desc = cat_def.get("description", cat_name)
+ core_section += f"\n### {cat_name}\n{desc}\n{build_questions_text(cat_def)}\n"
+
+ type_section = ""
+ for cat_name, cat_def in type_cats.items():
+ desc = cat_def.get("description", cat_name)
+ type_section += f"\n### {cat_name}\n{desc}\n{build_questions_text(cat_def)}\n"
+
+ reg_json = json.dumps(registry_entry, indent=2, ensure_ascii=False) if registry_entry else "{}"
+
+ return f"""You are scanning a research paper for methodological quality. This is a {paper_type} paper.
+
+Answer every question with a JSON object containing:
+- "applies": true/false (is this criterion relevant to this paper?)
+- "answer": true/false (does the paper satisfy it? false when applies=false)
+- "justification": "1-2 sentences citing specific evidence"
+- "source": "haiku"
+
+Be strict. Absence of evidence = answer: false. Do not be generous.
+
+## Registry Entry
+```json
+{reg_json}
+```
+
+## Shared Core Questions (answer ALL of these)
+{core_section}
+
+## {paper_type.title()} Module Questions (answer ALL of these)
+{type_section}
+
+## Additional Required Fields
+
+### Claims
+Extract 3-8 key empirical claims. For each: {{"claim": "...", "evidence": "...", "supported": "strong|moderate|weak|unsupported"}}
+
+### Key Findings
+2-4 sentence summary of the paper's most important findings.
+
+### Red Flags
+List methodological concerns: {{"flag": "short label", "detail": "explanation"}}
+
+### Methodology Tags
+Assign from: rct, observational, benchmark-eval, case-study, meta-analysis, theoretical, qualitative
+
+### Cited Papers
+Extract 3-10 survey-relevant references: {{"title": "...", "relevance": "..."}}
+
+### Engagement Factors
+Rate 0-3 on each dimension:
+- practical_relevance: Can practitioners use this?
+- surprise_contrarian: Challenges conventional wisdom?
+- fear_safety: Raises AI risk concerns?
+- drama_conflict: Controversy angle?
+- demo_ability: Can someone try it now?
+- brand_recognition: Famous lab or product?
+Each: {{"score": 0-3, "justification": "1 sentence"}}
+
+## Output
+
+Respond with a single JSON object:
+{{
+ "scan_version": 5,
+ "paper_type": "{paper_type}",
+ "paper": {{"title": "...", "authors": [...], "year": ..., "venue": "...", "arxiv_id": "...", "doi": "..."}},
+ "checklist": {{<shared core categories with questions>}},
+ "type_checklist": {{"{paper_type}": {{<type-specific categories with questions>}}}},
+ "claims": [...],
+ "methodology_tags": [...],
+ "key_findings": "...",
+ "red_flags": [...],
+ "cited_papers": [...],
+ "engagement_factors": {{...}},
+ "hn_data": {json.dumps(hn_data)}
+}}
+
+## Paper Text
+{paper_text}
+"""
+
+
+# ── HN Fetch ──────────────────────────────────────────────────────────
+
+def fetch_hn(paper_id, arxiv_id=""):
+ """Fetch HN data. Returns dict compatible with hn_data schema."""
+ hn_path = PAPERS_DIR / paper_id / "hn.json"
+ if hn_path.exists():
+ with open(hn_path) as f:
+ return json.load(f)
+
+ if not arxiv_id:
+ return {"threads": [], "top_points": 0, "total_points": 0, "total_comments": 0}
+
+ try:
+ params = urllib.parse.urlencode({"query": arxiv_id, "tags": "story", "hitsPerPage": 10})
+ req = urllib.request.Request(f"https://hn.algolia.com/api/v1/search?{params}",
+ headers={"User-Agent": "research-survey/1.0"})
+ resp = urllib.request.urlopen(req, timeout=10)
+ data = json.loads(resp.read())
+ hits = data.get("hits", [])
+ threads = []
+ for h in hits:
+ threads.append({
+ "hn_id": h.get("objectID", ""),
+ "title": h.get("title", ""),
+ "points": h.get("points", 0) or 0,
+ "comments": h.get("num_comments", 0) or 0,
+ "url": f"https://news.ycombinator.com/item?id={h.get('objectID', '')}",
+ })
+ threads.sort(key=lambda t: -t["points"])
+ return {
+ "threads": threads,
+ "top_points": threads[0]["points"] if threads else 0,
+ "total_points": sum(t["points"] for t in threads),
+ "total_comments": sum(t["comments"] for t in threads),
+ }
+ except Exception:
+ return {"threads": [], "top_points": 0, "total_points": 0, "total_comments": 0}
+
+
+# ── Scan One Paper ────────────────────────────────────────────────────
+
+def load_registry():
+ entries = {}
+ with open(ROOT / "registry.jsonl") as f:
+ for line in f:
+ if line.strip():
+ e = json.loads(line)
+ entries[e["id"]] = e
+ return entries
+
+
+def scan_one(paper_id, registry, force=False):
+ """Run v4 Haiku scan on one paper. Returns (paper_id, ok, reason, stats)."""
+ v5_path = PAPERS_DIR / paper_id / "scan-v5.json"
+ if v5_path.exists() and not force:
+ return paper_id, True, "already scanned", {}
+
+ txt_path = PAPERS_DIR / paper_id / "paper.txt"
+ type_path = PAPERS_DIR / paper_id / "paper_type.json"
+
+ if not txt_path.exists():
+ return paper_id, False, "no paper.txt", {}
+ if not type_path.exists():
+ return paper_id, False, "no paper_type.json", {}
+
+ with open(type_path) as f:
+ paper_type = json.load(f).get("paper_type")
+ if not paper_type:
+ return paper_id, False, "invalid paper_type", {}
+
+ paper_text = txt_path.read_text(encoding="utf-8").replace("\x00", "")
+ reg_entry = registry.get(paper_id, {})
+ arxiv_id = reg_entry.get("arxiv_id", "")
+
+ # Fetch HN data
+ hn_data = fetch_hn(paper_id, arxiv_id)
+
+ # Build and run prompt
+ prompt = build_prompt(paper_type, paper_text, paper_id, reg_entry, hn_data)
+
+ # Pick model: haiku for most papers, sonnet for large ones
+ model = "haiku"
+ if len(paper_text) > 50000:
+ model = "sonnet"
+
+ try:
+ result = subprocess.run(
+ ["claude", "-p", "-", "--model", model, "--max-turns", "1"],
+ input=prompt,
+ capture_output=True, text=True, timeout=600,
+ cwd=str(ROOT),
+ )
+
+ if result.returncode != 0:
+ # Retry with sonnet if haiku failed
+ if model == "haiku":
+ model = "sonnet"
+ result = subprocess.run(
+ ["claude", "-p", "-", "--model", model, "--max-turns", "1"],
+ input=prompt,
+ capture_output=True, text=True, timeout=600,
+ cwd=str(ROOT),
+ )
+ if result.returncode != 0:
+ return paper_id, False, f"claude exit {result.returncode} (sonnet fallback)", {}
+ else:
+ return paper_id, False, f"claude exit {result.returncode}", {}
+
+ output = result.stdout.strip()
+ json_start = output.find("{")
+ json_end = output.rfind("}") + 1
+ if json_start == -1 or json_end == 0:
+ return paper_id, False, "no JSON in output", {}
+
+ v5_scan = json.loads(output[json_start:json_end])
+
+ # Ensure required fields
+ v5_scan["scan_version"] = 5
+ v5_scan["paper_type"] = paper_type
+ v5_scan["hn_data"] = hn_data
+
+ # Mark all answers with the model that produced them
+ scan_model = model # haiku or sonnet
+ for section_key in ["checklist", "type_checklist"]:
+ section = v5_scan.get(section_key, {})
+ if section_key == "type_checklist":
+ for ptype_key, ptype_data in section.items():
+ if isinstance(ptype_data, dict):
+ for cat_data in ptype_data.values():
+ if isinstance(cat_data, dict):
+ for qd in cat_data.values():
+ if isinstance(qd, dict) and "applies" in qd and "source" not in qd:
+ qd["source"] = scan_model
+ else:
+ for cat_data in section.values():
+ if isinstance(cat_data, dict):
+ for qd in cat_data.values():
+ if isinstance(qd, dict) and "applies" in qd and "source" not in qd:
+ qd["source"] = scan_model
+
+ # Write v4 scan — pure Haiku/Sonnet output, NO merge with Opus.
+ # The build pipeline will overlay Opus answers at read time when both exist.
+ # Keeping them separate preserves the ability to compare per-question.
+ with open(v5_path, "w") as f:
+ json.dump(v5_scan, f, ensure_ascii=False, indent=2)
+
+ return paper_id, True, f"{scan_model}-only", {}
+
+ except json.JSONDecodeError as e:
+ return paper_id, False, f"JSON error: {e}", {}
+ except subprocess.TimeoutExpired:
+ return paper_id, False, "timeout", {}
+ except Exception as e:
+ return paper_id, False, f"error: {e}", {}
+
+
+# ── Main ──────────────────────────────────────────────────────────────
+
+def main():
+ args = sys.argv[1:]
+ force = "--force" in args
+ limit = None
+ specific_id = None
+ parallel = 1
+
+ for i, arg in enumerate(args):
+ if arg == "--limit" and i + 1 < len(args):
+ limit = int(args[i + 1])
+ if arg == "--id" and i + 1 < len(args):
+ specific_id = args[i + 1]
+ if arg == "--parallel" and i + 1 < len(args):
+ parallel = int(args[i + 1])
+
+ registry = load_registry()
+
+ # Collect candidates: papers with paper.txt + paper_type.json
+ candidates = []
+ for type_path in sorted(PAPERS_DIR.glob("*/paper_type.json")):
+ pid = type_path.parent.name
+ if specific_id and pid != specific_id:
+ continue
+ v5_path = type_path.parent / "scan-v5.json"
+ if v5_path.exists() and not force and not specific_id:
+ continue
+ txt_path = type_path.parent / "paper.txt"
+ if not txt_path.exists():
+ continue
+ candidates.append(pid)
+
+ if limit:
+ candidates = candidates[:limit]
+
+ if not candidates:
+ print("No papers to scan.")
+ return
+
+ print(f"V4 Haiku scan: {len(candidates)} papers"
+ f"{f' (parallel={parallel})' if parallel > 1 else ''}:\n")
+
+ ok_count = 0
+ fail_count = 0
+
+ if parallel > 1:
+ with ThreadPoolExecutor(max_workers=parallel) as executor:
+ futures = {executor.submit(scan_one, pid, registry, force): pid for pid in candidates}
+ for future in as_completed(futures):
+ pid, ok, reason, stats = future.result()
+ if ok:
+ ok_count += 1
+ else:
+ fail_count += 1
+ print(f" FAIL: {pid} — {reason}")
+ else:
+ for i, pid in enumerate(candidates):
+ if (i + 1) % 20 == 0:
+ print(f" ... {i+1}/{len(candidates)}")
+ pid, ok, reason, stats = scan_one(pid, registry, force)
+ if ok:
+ ok_count += 1
+ else:
+ fail_count += 1
+ print(f" FAIL: {pid} — {reason}")
+
+ print(f"\nDone. OK: {ok_count}, Failed: {fail_count}")
+
+
+if __name__ == "__main__":
+ main()