ai-research-survey

Systematic scan of agentic development research. What's signal, what's noise.
git clone https://git.shiptheloop.com/ai-research-survey.git
Log | Files | Refs

run-scan.py (9547B)


      1 #!/usr/bin/env python3
      2 """
      3 Orchestrate the scan pipeline: extract text → run scan agent → validate output.
      4 
      5 For each paper with status 'downloaded':
      6 1. Extract text if paper.txt doesn't exist (calls extract-text.py logic)
      7 2. Run the scan agent via claude CLI
      8 3. Validate scan.json against the schema
      9 4. Update registry status to 'scanned'
     10 
     11 Usage:
     12     python scripts/run-scan.py                    # All downloaded papers
     13     python scripts/run-scan.py --id metr-rct-2025 # Specific paper
     14     python scripts/run-scan.py --limit 10         # First N papers
     15     python scripts/run-scan.py --dry-run          # Show what would be scanned
     16     python scripts/run-scan.py --parallel 4       # Run N scans concurrently
     17 """
     18 
     19 import json
     20 import subprocess
     21 import sys
     22 import os
     23 from concurrent.futures import ThreadPoolExecutor, as_completed
     24 from pathlib import Path
     25 
     26 ROOT = Path(__file__).resolve().parent.parent
     27 REGISTRY_PATH = ROOT / "registry.jsonl"
     28 PAPERS_DIR = ROOT / "papers"
     29 SCAN_AGENT_PROMPT = ROOT / "agents" / "scan-agent.md"
     30 SCAN_SCHEMA = ROOT / "schema" / "scan.schema.json"
     31 
     32 # Heuristics for text extraction quality (duplicated from extract-text.py)
     33 MIN_CHARS = 500
     34 MIN_WORDS_PER_PAGE = 30
     35 MAX_GARBLE_RATIO = 0.15
     36 
     37 
     38 def load_registry():
     39     entries = []
     40     with open(REGISTRY_PATH) as f:
     41         for line in f:
     42             line = line.strip()
     43             if line:
     44                 entries.append(json.loads(line))
     45     return entries
     46 
     47 
     48 def save_registry(entries):
     49     with open(REGISTRY_PATH, "w") as f:
     50         for entry in entries:
     51             f.write(json.dumps(entry, ensure_ascii=False) + "\n")
     52 
     53 
     54 def ensure_text(entry):
     55     """Extract text if paper.txt doesn't exist. Returns (ok, reason)."""
     56     pdf_path = PAPERS_DIR / entry["id"] / "paper.pdf"
     57     txt_path = PAPERS_DIR / entry["id"] / "paper.txt"
     58 
     59     if txt_path.exists() and txt_path.stat().st_size > MIN_CHARS:
     60         return True, "already extracted"
     61 
     62     if not pdf_path.exists():
     63         return False, "no PDF"
     64 
     65     try:
     66         import fitz
     67         doc = fitz.open(str(pdf_path))
     68         pages = [page.get_text() for page in doc]
     69         doc.close()
     70         text = "\n\n".join(pages)
     71 
     72         # Quality check
     73         words = text.split()
     74         words_per_page = len(words) / max(len(pages), 1)
     75         non_ascii = sum(1 for c in text if ord(c) > 127 and not c.isalpha())
     76         garble_ratio = non_ascii / max(len(text), 1)
     77 
     78         if len(text) < MIN_CHARS:
     79             return False, f"pymupdf: too short ({len(text)} chars)"
     80         if words_per_page < MIN_WORDS_PER_PAGE:
     81             return False, f"pymupdf: too few words/page ({words_per_page:.0f})"
     82         if garble_ratio > MAX_GARBLE_RATIO:
     83             return False, f"pymupdf: garbled ({garble_ratio:.1%} non-ASCII)"
     84 
     85         txt_path.write_text(text, encoding="utf-8")
     86         return True, f"extracted {len(text)} chars"
     87 
     88     except Exception as e:
     89         return False, f"pymupdf error: {e}"
     90 
     91 
     92 def run_scan_agent(entry, max_turns=8):
     93     """Run the scan agent on a single paper. Returns (ok, reason)."""
     94     txt_path = PAPERS_DIR / entry["id"] / "paper.txt"
     95     scan_path = PAPERS_DIR / entry["id"] / "scan.json"
     96 
     97     if scan_path.exists():
     98         return True, "already scanned"
     99 
    100     paper_text = txt_path.read_text(encoding="utf-8")
    101     # Strip null bytes from bad text extraction
    102     paper_text = paper_text.replace("\x00", "")
    103 
    104     # Build the prompt
    105     registry_json = json.dumps(entry, indent=2, ensure_ascii=False)
    106     prompt = f"""You are the scan agent. Read your full instructions at agents/scan-agent.md and the schema at schema/scan.schema.json.
    107 
    108 Scan this paper and write the result to papers/{entry['id']}/scan.json.
    109 
    110 ## Registry Entry
    111 ```json
    112 {registry_json}
    113 ```
    114 
    115 ## Paper Text
    116 {paper_text}
    117 """
    118 
    119     try:
    120         result = subprocess.run(
    121             [
    122                 "claude", "-p", "-",
    123                 "--model", "opus",
    124                 "--allowedTools", "Read,Write,Edit",
    125                 "--max-turns", str(max_turns),
    126             ],
    127             input=prompt,
    128             capture_output=True, text=True, timeout=600,
    129             cwd=str(ROOT),
    130         )
    131 
    132         if result.returncode != 0:
    133             return False, f"claude exit {result.returncode}: {result.stderr[:200]}"
    134 
    135         # Check if scan.json was created
    136         if not scan_path.exists():
    137             return False, "scan.json not created"
    138 
    139         # Validate JSON
    140         try:
    141             with open(scan_path) as f:
    142                 scan = json.load(f)
    143         except json.JSONDecodeError as e:
    144             scan_path.unlink()
    145             return False, f"invalid JSON: {e}"
    146 
    147         # Basic schema validation (check required fields per v2 schema)
    148         required = ["paper", "checklist", "claims", "methodology_tags", "key_findings", "red_flags", "cited_papers"]
    149         missing = [r for r in required if r not in scan]
    150         if missing:
    151             scan_path.unlink()
    152             return False, f"missing fields: {missing}"
    153 
    154         # Run full schema validation if available
    155         validate_result = subprocess.run(
    156             ["python3", str(ROOT / "scripts" / "validate-scan.py"), str(scan_path)],
    157             capture_output=True, text=True, cwd=str(ROOT),
    158         )
    159         if validate_result.returncode != 0:
    160             scan_path.unlink()
    161             return False, f"schema validation failed: {validate_result.stdout[:500]}{validate_result.stderr[:500]}"
    162 
    163         return True, "scanned"
    164 
    165     except subprocess.TimeoutExpired:
    166         return False, "timeout (600s)"
    167     except FileNotFoundError:
    168         return False, "'claude' CLI not found"
    169     except Exception as e:
    170         return False, f"error: {e}"
    171 
    172 
    173 def scan_one(entry, max_turns=8):
    174     """Full pipeline for one paper: extract text → scan → return result."""
    175     paper_id = entry["id"]
    176 
    177     # Step 1: ensure text
    178     ok, reason = ensure_text(entry)
    179     if not ok:
    180         return paper_id, False, f"text extraction failed: {reason}"
    181 
    182     # Step 2: run scan
    183     ok, reason = run_scan_agent(entry, max_turns=max_turns)
    184     return paper_id, ok, reason
    185 
    186 
    187 def main():
    188     args = sys.argv[1:]
    189     dry_run = "--dry-run" in args
    190     limit = None
    191     specific_id = None
    192     parallel = 1
    193     max_turns = 8
    194 
    195     for i, arg in enumerate(args):
    196         if arg == "--limit" and i + 1 < len(args):
    197             limit = int(args[i + 1])
    198         if arg == "--id" and i + 1 < len(args):
    199             specific_id = args[i + 1]
    200         if arg == "--parallel" and i + 1 < len(args):
    201             parallel = int(args[i + 1])
    202         if arg == "--max-turns" and i + 1 < len(args):
    203             max_turns = int(args[i + 1])
    204 
    205     entries = load_registry()
    206 
    207     candidates = []
    208     for entry in entries:
    209         if specific_id and entry["id"] != specific_id:
    210             continue
    211         if entry["status"] != "downloaded" and not specific_id:
    212             continue
    213         scan_path = PAPERS_DIR / entry["id"] / "scan.json"
    214         if scan_path.exists() and not specific_id:
    215             continue
    216         candidates.append(entry)
    217 
    218     if limit:
    219         candidates = candidates[:limit]
    220 
    221     if not candidates:
    222         print("No papers to scan.")
    223         return
    224 
    225     print(f"{'Would scan' if dry_run else 'Scanning'} {len(candidates)} paper(s)"
    226           f"{f' (parallel={parallel})' if parallel > 1 else ''}:\n")
    227 
    228     if dry_run:
    229         for entry in candidates:
    230             txt_exists = (PAPERS_DIR / entry["id"] / "paper.txt").exists()
    231             print(f"  {entry['id']} {'(text ready)' if txt_exists else '(needs extraction)'}")
    232         return
    233 
    234     results = {"scanned": 0, "failed": 0, "skipped": 0}
    235     failures = []
    236 
    237     if parallel > 1:
    238         with ThreadPoolExecutor(max_workers=parallel) as executor:
    239             futures = {executor.submit(scan_one, e, max_turns): e for e in candidates}
    240             for future in as_completed(futures):
    241                 paper_id, ok, reason = future.result()
    242                 if ok:
    243                     results["scanned"] += 1
    244                     print(f"  OK: {paper_id} — {reason}")
    245                 else:
    246                     results["failed"] += 1
    247                     failures.append((paper_id, reason))
    248                     print(f"  FAIL: {paper_id} — {reason}")
    249     else:
    250         for i, entry in enumerate(candidates):
    251             print(f"[{i+1}/{len(candidates)}] {entry['id']}")
    252             paper_id, ok, reason = scan_one(entry, max_turns)
    253             if ok:
    254                 results["scanned"] += 1
    255                 print(f"  OK: {reason}")
    256             else:
    257                 results["failed"] += 1
    258                 failures.append((paper_id, reason))
    259                 print(f"  FAIL: {reason}")
    260 
    261     # Update registry for successful scans
    262     entries = load_registry()  # Reload in case of parallel modifications
    263     scanned_ids = set()
    264     for entry in entries:
    265         scan_path = PAPERS_DIR / entry["id"] / "scan.json"
    266         if scan_path.exists() and entry["status"] == "downloaded":
    267             entry["status"] = "scanned"
    268             scanned_ids.add(entry["id"])
    269     save_registry(entries)
    270 
    271     print(f"\nDone. Scanned: {results['scanned']}, Failed: {results['failed']}")
    272     if scanned_ids:
    273         print(f"Registry updated: {len(scanned_ids)} entries → 'scanned'")
    274 
    275     if failures:
    276         failure_path = ROOT / "scan-failures.txt"
    277         with open(failure_path, "w") as f:
    278             f.write(f"# Scan failures ({len(failures)} total)\n\n")
    279             for paper_id, reason in failures:
    280                 f.write(f"{paper_id}\n  {reason}\n\n")
    281         print(f"Failure log: {failure_path}")
    282 
    283 
    284 if __name__ == "__main__":
    285     main()

Impressum · Datenschutz