run-scan.py (9547B)
1 #!/usr/bin/env python3 2 """ 3 Orchestrate the scan pipeline: extract text → run scan agent → validate output. 4 5 For each paper with status 'downloaded': 6 1. Extract text if paper.txt doesn't exist (calls extract-text.py logic) 7 2. Run the scan agent via claude CLI 8 3. Validate scan.json against the schema 9 4. Update registry status to 'scanned' 10 11 Usage: 12 python scripts/run-scan.py # All downloaded papers 13 python scripts/run-scan.py --id metr-rct-2025 # Specific paper 14 python scripts/run-scan.py --limit 10 # First N papers 15 python scripts/run-scan.py --dry-run # Show what would be scanned 16 python scripts/run-scan.py --parallel 4 # Run N scans concurrently 17 """ 18 19 import json 20 import subprocess 21 import sys 22 import os 23 from concurrent.futures import ThreadPoolExecutor, as_completed 24 from pathlib import Path 25 26 ROOT = Path(__file__).resolve().parent.parent 27 REGISTRY_PATH = ROOT / "registry.jsonl" 28 PAPERS_DIR = ROOT / "papers" 29 SCAN_AGENT_PROMPT = ROOT / "agents" / "scan-agent.md" 30 SCAN_SCHEMA = ROOT / "schema" / "scan.schema.json" 31 32 # Heuristics for text extraction quality (duplicated from extract-text.py) 33 MIN_CHARS = 500 34 MIN_WORDS_PER_PAGE = 30 35 MAX_GARBLE_RATIO = 0.15 36 37 38 def load_registry(): 39 entries = [] 40 with open(REGISTRY_PATH) as f: 41 for line in f: 42 line = line.strip() 43 if line: 44 entries.append(json.loads(line)) 45 return entries 46 47 48 def save_registry(entries): 49 with open(REGISTRY_PATH, "w") as f: 50 for entry in entries: 51 f.write(json.dumps(entry, ensure_ascii=False) + "\n") 52 53 54 def ensure_text(entry): 55 """Extract text if paper.txt doesn't exist. Returns (ok, reason).""" 56 pdf_path = PAPERS_DIR / entry["id"] / "paper.pdf" 57 txt_path = PAPERS_DIR / entry["id"] / "paper.txt" 58 59 if txt_path.exists() and txt_path.stat().st_size > MIN_CHARS: 60 return True, "already extracted" 61 62 if not pdf_path.exists(): 63 return False, "no PDF" 64 65 try: 66 import fitz 67 doc = fitz.open(str(pdf_path)) 68 pages = [page.get_text() for page in doc] 69 doc.close() 70 text = "\n\n".join(pages) 71 72 # Quality check 73 words = text.split() 74 words_per_page = len(words) / max(len(pages), 1) 75 non_ascii = sum(1 for c in text if ord(c) > 127 and not c.isalpha()) 76 garble_ratio = non_ascii / max(len(text), 1) 77 78 if len(text) < MIN_CHARS: 79 return False, f"pymupdf: too short ({len(text)} chars)" 80 if words_per_page < MIN_WORDS_PER_PAGE: 81 return False, f"pymupdf: too few words/page ({words_per_page:.0f})" 82 if garble_ratio > MAX_GARBLE_RATIO: 83 return False, f"pymupdf: garbled ({garble_ratio:.1%} non-ASCII)" 84 85 txt_path.write_text(text, encoding="utf-8") 86 return True, f"extracted {len(text)} chars" 87 88 except Exception as e: 89 return False, f"pymupdf error: {e}" 90 91 92 def run_scan_agent(entry, max_turns=8): 93 """Run the scan agent on a single paper. Returns (ok, reason).""" 94 txt_path = PAPERS_DIR / entry["id"] / "paper.txt" 95 scan_path = PAPERS_DIR / entry["id"] / "scan.json" 96 97 if scan_path.exists(): 98 return True, "already scanned" 99 100 paper_text = txt_path.read_text(encoding="utf-8") 101 # Strip null bytes from bad text extraction 102 paper_text = paper_text.replace("\x00", "") 103 104 # Build the prompt 105 registry_json = json.dumps(entry, indent=2, ensure_ascii=False) 106 prompt = f"""You are the scan agent. Read your full instructions at agents/scan-agent.md and the schema at schema/scan.schema.json. 107 108 Scan this paper and write the result to papers/{entry['id']}/scan.json. 109 110 ## Registry Entry 111 ```json 112 {registry_json} 113 ``` 114 115 ## Paper Text 116 {paper_text} 117 """ 118 119 try: 120 result = subprocess.run( 121 [ 122 "claude", "-p", "-", 123 "--model", "opus", 124 "--allowedTools", "Read,Write,Edit", 125 "--max-turns", str(max_turns), 126 ], 127 input=prompt, 128 capture_output=True, text=True, timeout=600, 129 cwd=str(ROOT), 130 ) 131 132 if result.returncode != 0: 133 return False, f"claude exit {result.returncode}: {result.stderr[:200]}" 134 135 # Check if scan.json was created 136 if not scan_path.exists(): 137 return False, "scan.json not created" 138 139 # Validate JSON 140 try: 141 with open(scan_path) as f: 142 scan = json.load(f) 143 except json.JSONDecodeError as e: 144 scan_path.unlink() 145 return False, f"invalid JSON: {e}" 146 147 # Basic schema validation (check required fields per v2 schema) 148 required = ["paper", "checklist", "claims", "methodology_tags", "key_findings", "red_flags", "cited_papers"] 149 missing = [r for r in required if r not in scan] 150 if missing: 151 scan_path.unlink() 152 return False, f"missing fields: {missing}" 153 154 # Run full schema validation if available 155 validate_result = subprocess.run( 156 ["python3", str(ROOT / "scripts" / "validate-scan.py"), str(scan_path)], 157 capture_output=True, text=True, cwd=str(ROOT), 158 ) 159 if validate_result.returncode != 0: 160 scan_path.unlink() 161 return False, f"schema validation failed: {validate_result.stdout[:500]}{validate_result.stderr[:500]}" 162 163 return True, "scanned" 164 165 except subprocess.TimeoutExpired: 166 return False, "timeout (600s)" 167 except FileNotFoundError: 168 return False, "'claude' CLI not found" 169 except Exception as e: 170 return False, f"error: {e}" 171 172 173 def scan_one(entry, max_turns=8): 174 """Full pipeline for one paper: extract text → scan → return result.""" 175 paper_id = entry["id"] 176 177 # Step 1: ensure text 178 ok, reason = ensure_text(entry) 179 if not ok: 180 return paper_id, False, f"text extraction failed: {reason}" 181 182 # Step 2: run scan 183 ok, reason = run_scan_agent(entry, max_turns=max_turns) 184 return paper_id, ok, reason 185 186 187 def main(): 188 args = sys.argv[1:] 189 dry_run = "--dry-run" in args 190 limit = None 191 specific_id = None 192 parallel = 1 193 max_turns = 8 194 195 for i, arg in enumerate(args): 196 if arg == "--limit" and i + 1 < len(args): 197 limit = int(args[i + 1]) 198 if arg == "--id" and i + 1 < len(args): 199 specific_id = args[i + 1] 200 if arg == "--parallel" and i + 1 < len(args): 201 parallel = int(args[i + 1]) 202 if arg == "--max-turns" and i + 1 < len(args): 203 max_turns = int(args[i + 1]) 204 205 entries = load_registry() 206 207 candidates = [] 208 for entry in entries: 209 if specific_id and entry["id"] != specific_id: 210 continue 211 if entry["status"] != "downloaded" and not specific_id: 212 continue 213 scan_path = PAPERS_DIR / entry["id"] / "scan.json" 214 if scan_path.exists() and not specific_id: 215 continue 216 candidates.append(entry) 217 218 if limit: 219 candidates = candidates[:limit] 220 221 if not candidates: 222 print("No papers to scan.") 223 return 224 225 print(f"{'Would scan' if dry_run else 'Scanning'} {len(candidates)} paper(s)" 226 f"{f' (parallel={parallel})' if parallel > 1 else ''}:\n") 227 228 if dry_run: 229 for entry in candidates: 230 txt_exists = (PAPERS_DIR / entry["id"] / "paper.txt").exists() 231 print(f" {entry['id']} {'(text ready)' if txt_exists else '(needs extraction)'}") 232 return 233 234 results = {"scanned": 0, "failed": 0, "skipped": 0} 235 failures = [] 236 237 if parallel > 1: 238 with ThreadPoolExecutor(max_workers=parallel) as executor: 239 futures = {executor.submit(scan_one, e, max_turns): e for e in candidates} 240 for future in as_completed(futures): 241 paper_id, ok, reason = future.result() 242 if ok: 243 results["scanned"] += 1 244 print(f" OK: {paper_id} — {reason}") 245 else: 246 results["failed"] += 1 247 failures.append((paper_id, reason)) 248 print(f" FAIL: {paper_id} — {reason}") 249 else: 250 for i, entry in enumerate(candidates): 251 print(f"[{i+1}/{len(candidates)}] {entry['id']}") 252 paper_id, ok, reason = scan_one(entry, max_turns) 253 if ok: 254 results["scanned"] += 1 255 print(f" OK: {reason}") 256 else: 257 results["failed"] += 1 258 failures.append((paper_id, reason)) 259 print(f" FAIL: {reason}") 260 261 # Update registry for successful scans 262 entries = load_registry() # Reload in case of parallel modifications 263 scanned_ids = set() 264 for entry in entries: 265 scan_path = PAPERS_DIR / entry["id"] / "scan.json" 266 if scan_path.exists() and entry["status"] == "downloaded": 267 entry["status"] = "scanned" 268 scanned_ids.add(entry["id"]) 269 save_registry(entries) 270 271 print(f"\nDone. Scanned: {results['scanned']}, Failed: {results['failed']}") 272 if scanned_ids: 273 print(f"Registry updated: {len(scanned_ids)} entries → 'scanned'") 274 275 if failures: 276 failure_path = ROOT / "scan-failures.txt" 277 with open(failure_path, "w") as f: 278 f.write(f"# Scan failures ({len(failures)} total)\n\n") 279 for paper_id, reason in failures: 280 f.write(f"{paper_id}\n {reason}\n\n") 281 print(f"Failure log: {failure_path}") 282 283 284 if __name__ == "__main__": 285 main()