commit 7dd5ec90a306d461240d147b5312e9fddc153ba1
parent 402389afc4dfd249919a7689faa430b7ce0bb9ac
Author: Brian Graham <brian@buildingbetterteams.de>
Date: Fri, 3 Apr 2026 20:30:24 +0200
Add parallel execution to harness (-j flag)
Usage: python3 harness/run.py grid.yaml main_effects -j 2
Runs N experiments concurrently with rolling parallelism (as soon as
one finishes, the next starts). Thread-safe logging, index writes,
and progress tracking.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Diffstat:
| M | harness/run.py | | | 242 | ++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------- |
1 file changed, 155 insertions(+), 87 deletions(-)
diff --git a/harness/run.py b/harness/run.py
@@ -5,12 +5,14 @@ Computes the experiment grid, creates isolated workspaces, invokes claude,
runs evaluation, and stores results.
Usage:
- python3 run.py [grid_file] [profile_or_design]
+ python3 run.py [grid_file] [profile_or_design] [-j N]
profile_or_design can be:
- A profile name from grid.yaml (e.g., smoke, core, full)
- A DOE design: main_effects, plackett_burman
- interaction_hunt:axis1,axis2,axis3
+
+ -j N: run N experiments in parallel (default 1)
"""
import json
@@ -20,7 +22,9 @@ import subprocess
import sys
import tarfile
import tempfile
+import threading
import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
@@ -317,9 +321,116 @@ def archive_workspace(workspace: Path, run_dir: Path):
pass
+_print_lock = threading.Lock()
+_index_lock = threading.Lock()
+_counter_lock = threading.Lock()
+
+
+def log(msg: str):
+ with _print_lock:
+ print(msg, flush=True)
+
+
+def run_single(
+ cell: dict,
+ run_num: int,
+ results_dir: Path,
+ project_dir: Path,
+ claude_version: str,
+) -> str:
+ """Execute a single experiment run. Returns 'completed', 'skipped', or 'failed'."""
+ cell_id = cell["cell_id"]
+ task = cell["task"]
+ model = cell["model"]
+ prompt_style = cell["prompt_style"]
+ run_id = f"{cell_id}_run{run_num}"
+ run_dir = results_dir / "runs" / run_id
+
+ # Resume support
+ if (run_dir / "eval_results.json").exists():
+ log(f"SKIP: {run_id}")
+ return "skipped"
+
+ log(f"START: {task} | {model} | {prompt_style} | run{run_num}")
+
+ run_dir.mkdir(parents=True, exist_ok=True)
+
+ # Save meta
+ meta = {
+ **cell,
+ "run_id": run_id,
+ "run_number": run_num,
+ "claude_version": claude_version,
+ "started_at": datetime.now(timezone.utc).isoformat(),
+ }
+ (run_dir / "meta.json").write_text(json.dumps(meta, indent=2))
+
+ # Create workspace
+ try:
+ workspace = create_workspace(project_dir, task, cell)
+ except Exception as e:
+ log(f" ERROR creating workspace for {run_id}: {e}")
+ return "failed"
+
+ # Invoke claude
+ start_time = time.time()
+ exit_code = invoke_claude(cell, workspace, run_dir, project_dir)
+ wall_time = int(time.time() - start_time)
+
+ status = "ok" if exit_code == 0 else f"exit {exit_code}"
+
+ # Update meta with timing
+ meta["wall_time_seconds"] = wall_time
+ meta["exit_code"] = exit_code
+ meta["completed_at"] = datetime.now(timezone.utc).isoformat()
+ (run_dir / "meta.json").write_text(json.dumps(meta, indent=2))
+
+ # Evaluate
+ task_dir = project_dir / "tasks" / task
+ evaluate(task_dir, workspace, cell, run_dir)
+
+ # Append to index (thread-safe)
+ index_entry = {
+ "run_id": run_id,
+ "task": task,
+ "model": model,
+ "cell_id": cell_id,
+ "completed_at": meta["completed_at"],
+ }
+ with _index_lock:
+ with open(results_dir / "index.jsonl", "a") as f:
+ f.write(json.dumps(index_entry) + "\n")
+
+ # Archive and cleanup
+ archive_workspace(workspace, run_dir)
+
+ result = "completed" if (run_dir / "eval_results.json").exists() else "failed"
+ log(f" DONE: {task} | {model} | {prompt_style} | run{run_num} | {status} | {wall_time}s | {result}")
+ return result
+
+
def main():
- grid_file = sys.argv[1] if len(sys.argv) > 1 else str(PROJECT_DIR / "grid.yaml")
- profile = sys.argv[2] if len(sys.argv) > 2 else "smoke"
+ # Parse args
+ args = sys.argv[1:]
+ parallel = 1
+ grid_file = str(PROJECT_DIR / "grid.yaml")
+ profile = "smoke"
+
+ i = 0
+ positional = []
+ while i < len(args):
+ if args[i] == "-j" and i + 1 < len(args):
+ parallel = int(args[i + 1])
+ i += 2
+ else:
+ positional.append(args[i])
+ i += 1
+
+ if len(positional) >= 1:
+ grid_file = positional[0]
+ if len(positional) >= 2:
+ profile = positional[1]
+
results_dir = PROJECT_DIR / "results"
results_dir.mkdir(exist_ok=True)
(results_dir / "runs").mkdir(exist_ok=True)
@@ -344,6 +455,7 @@ def main():
print("=" * 40)
print(f"Grid file: {grid_file}")
print(f"Profile: {profile}")
+ print(f"Parallel: {parallel}")
print(f"Results: {results_dir}")
print("=" * 40)
@@ -364,103 +476,59 @@ def main():
cells = compute_cells(grid, profile)
print(f"Profile: {profile}")
- print(f"Grid cells: {len(cells)}")
+ # Build the full list of (cell, run_num) jobs
+ jobs = []
+ for cell in cells:
+ runs_per_cell = cell.get("runs_per_cell", 3)
+ for run_num in range(1, runs_per_cell + 1):
+ jobs.append((cell, run_num))
+
+ print(f"Total jobs: {len(jobs)}")
print()
completed = 0
skipped = 0
failed = 0
- for cell in cells:
- task = cell["task"]
- cell_id = cell["cell_id"]
- runs_per_cell = cell.get("runs_per_cell", 3)
- model = cell["model"]
- prompt_style = cell["prompt_style"]
-
- for run_num in range(1, runs_per_cell + 1):
- run_id = f"{cell_id}_run{run_num}"
- run_dir = results_dir / "runs" / run_id
-
- # Resume support
- if (run_dir / "eval_results.json").exists():
- print(f"SKIP: {run_id}")
+ if parallel <= 1:
+ # Sequential
+ for cell, run_num in jobs:
+ result = run_single(cell, run_num, results_dir, PROJECT_DIR, claude_version)
+ if result == "completed":
+ completed += 1
+ elif result == "skipped":
skipped += 1
- continue
-
- print("-" * 40)
- print(f"RUN: {run_id}")
- print(f"Task: {task} | Model: {model} | Prompt: {prompt_style}")
- print("-" * 40)
-
- run_dir.mkdir(parents=True, exist_ok=True)
-
- # Save meta
- meta = {
- **cell,
- "run_id": run_id,
- "run_number": run_num,
- "claude_version": claude_version,
- "started_at": datetime.now(timezone.utc).isoformat(),
- }
- (run_dir / "meta.json").write_text(json.dumps(meta, indent=2))
-
- # Create workspace
- print(" Creating workspace...")
- try:
- workspace = create_workspace(PROJECT_DIR, task, cell)
- print(f" Workspace: {workspace}")
- except Exception as e:
- print(f" ERROR creating workspace: {e}")
- failed += 1
- continue
-
- # Invoke claude
- print(f" Invoking claude (model={model})...")
- start_time = time.time()
- exit_code = invoke_claude(cell, workspace, run_dir, PROJECT_DIR)
- wall_time = int(time.time() - start_time)
-
- if exit_code == 0:
- print(" Claude completed successfully")
else:
- print(f" Claude exited with error (exit code: {exit_code})")
-
- # Update meta with timing
- meta["wall_time_seconds"] = wall_time
- meta["exit_code"] = exit_code
- meta["completed_at"] = datetime.now(timezone.utc).isoformat()
- (run_dir / "meta.json").write_text(json.dumps(meta, indent=2))
-
- # Evaluate
- print(" Running evaluation...")
- task_dir = PROJECT_DIR / "tasks" / task
- evaluate(task_dir, workspace, cell, run_dir)
- print(" Evaluation complete")
-
- # Append to index
- index_entry = {
- "run_id": run_id,
- "task": task,
- "model": model,
- "cell_id": cell_id,
- "completed_at": meta["completed_at"],
+ failed += 1
+ else:
+ # Parallel with rolling concurrency
+ with ThreadPoolExecutor(max_workers=parallel) as executor:
+ futures = {
+ executor.submit(
+ run_single, cell, run_num, results_dir, PROJECT_DIR, claude_version
+ ): (cell, run_num)
+ for cell, run_num in jobs
}
- with open(results_dir / "index.jsonl", "a") as f:
- f.write(json.dumps(index_entry) + "\n")
- # Archive and cleanup
- print(" Archiving workspace...")
- archive_workspace(workspace, run_dir)
+ for future in as_completed(futures):
+ try:
+ result = future.result()
+ except Exception as e:
+ log(f" ERROR: {e}")
+ result = "failed"
- if (run_dir / "eval_results.json").exists():
- completed += 1
- else:
- failed += 1
+ with _counter_lock:
+ if result == "completed":
+ completed += 1
+ elif result == "skipped":
+ skipped += 1
+ else:
+ failed += 1
- print(f" Done. ({completed} completed, {skipped} skipped, {failed} failed)")
- print()
+ total_done = completed + skipped + failed
+ log(f" Progress: {total_done}/{len(jobs)} ({completed} completed, {skipped} skipped, {failed} failed)")
+ print()
print("=" * 40)
print("All runs complete.")
print(f"Completed: {completed} | Skipped: {skipped} | Failed: {failed}")