pca-analysis.py (8712B)
1 #!/usr/bin/env python3 2 """PCA analysis of benchmark runs. 3 4 Loads all runs, one-hot encodes categorical axis values, runs PCA with 5 3 components, and saves results to results/analysis/pca.json. 6 7 Usage: 8 python3 harness/pca-analysis.py 9 """ 10 11 import json 12 import sys 13 from pathlib import Path 14 15 import numpy as np 16 from sklearn.decomposition import PCA 17 from sklearn.preprocessing import StandardScaler 18 19 PROJECT_DIR = Path(__file__).resolve().parent.parent 20 RESULTS_DIR = PROJECT_DIR / "results" 21 RUNS_DIR = RESULTS_DIR / "runs" 22 OUTPUT_PATH = RESULTS_DIR / "analysis" / "pca.json" 23 24 # Axes to include in PCA (must match meta.json keys) 25 AXES = [ 26 "model", 27 "effort", 28 "prompt_style", 29 "language", 30 "human_language", 31 "tool_read", 32 "tool_write", 33 "tool_edit", 34 "tool_glob", 35 "tool_grep", 36 "linter", 37 "playwright", 38 "context_file", 39 "web_search", 40 "max_budget", 41 "tests_provided", 42 "strategy", 43 "design_guidance", 44 "architecture", 45 "error_checking", 46 "context_noise", 47 "renderer", 48 "provider", 49 ] 50 51 # Defaults for axes that may not exist in older runs 52 AXIS_DEFAULTS = { 53 "strategy": "none", 54 "tests_provided": "none", 55 "design_guidance": "none", 56 "architecture": "none", 57 "error_checking": "none", 58 "context_noise": "clean", 59 "renderer": "none", 60 "provider": "anthropic", 61 } 62 63 # Normalize old schema values (same as dashboard data.ts) 64 def normalize_meta(meta: dict) -> dict: 65 if meta.get("sub_agents") and not meta.get("strategy"): 66 meta["strategy"] = "use_subagents" if meta["sub_agents"] == "on" else "none" 67 if meta.get("playwright") == "on": 68 meta["playwright"] = "available" 69 for key, default in AXIS_DEFAULTS.items(): 70 if key not in meta or meta[key] is None: 71 meta[key] = default 72 return meta 73 74 75 def load_runs() -> list[dict]: 76 """Load all runs that have both meta.json and eval_results.json with a score.""" 77 runs = [] 78 if not RUNS_DIR.exists(): 79 return runs 80 81 for run_dir in sorted(RUNS_DIR.iterdir()): 82 if not run_dir.is_dir(): 83 continue 84 meta_path = run_dir / "meta.json" 85 eval_path = run_dir / "eval_results.json" 86 if not meta_path.exists() or not eval_path.exists(): 87 continue 88 89 try: 90 meta = json.loads(meta_path.read_text()) 91 meta = normalize_meta(meta) 92 eval_results = json.loads(eval_path.read_text()) 93 score = eval_results.get("score") 94 if score is None: 95 continue 96 runs.append({ 97 "meta": meta, 98 "score": float(score), 99 "run_id": meta.get("run_id", run_dir.name), 100 "short_id": meta.get("short_id", run_dir.name[:8]), 101 }) 102 except (json.JSONDecodeError, ValueError): 103 continue 104 105 return runs 106 107 108 def build_feature_matrix(runs: list[dict]) -> tuple[np.ndarray, list[str], list[str], dict[str, list[str]]]: 109 """One-hot encode all axis values. 110 111 Returns: 112 matrix: (n_runs, n_features) array 113 feature_names: list of feature names like "model_haiku" 114 axis_for_feature: which axis each feature belongs to 115 axis_features: mapping from axis name to list of feature names 116 """ 117 # Collect all unique values per axis 118 axis_values: dict[str, set[str]] = {ax: set() for ax in AXES} 119 for run in runs: 120 for ax in AXES: 121 val = str(run["meta"].get(ax, AXIS_DEFAULTS.get(ax, "unknown"))) 122 axis_values[ax].add(val) 123 124 # Sort values for determinism 125 axis_sorted: dict[str, list[str]] = { 126 ax: sorted(vals) for ax, vals in axis_values.items() 127 } 128 129 # Skip axes with only one unique value (no variance to explain) 130 active_axes = {ax: vals for ax, vals in axis_sorted.items() if len(vals) > 1} 131 skipped = [ax for ax in AXES if ax not in active_axes] 132 if skipped: 133 print(f" Skipping constant axes: {', '.join(skipped)}") 134 135 # Build feature names and axis mapping 136 feature_names: list[str] = [] 137 axis_for_feature: list[str] = [] 138 axis_features: dict[str, list[str]] = {} 139 140 for ax, vals in active_axes.items(): 141 axis_features[ax] = [] 142 for val in vals: 143 fname = f"{ax}_{val}" 144 feature_names.append(fname) 145 axis_for_feature.append(ax) 146 axis_features[ax].append(fname) 147 148 # Build the matrix 149 n_runs = len(runs) 150 n_features = len(feature_names) 151 matrix = np.zeros((n_runs, n_features), dtype=np.float64) 152 153 for i, run in enumerate(runs): 154 for ax, vals in active_axes.items(): 155 run_val = str(run["meta"].get(ax, AXIS_DEFAULTS.get(ax, "unknown"))) 156 for val in vals: 157 fname = f"{ax}_{val}" 158 j = feature_names.index(fname) 159 matrix[i, j] = 1.0 if run_val == val else 0.0 160 161 return matrix, feature_names, axis_for_feature, axis_features 162 163 164 def run_pca(runs: list[dict]) -> dict: 165 """Run PCA and return structured results.""" 166 matrix, feature_names, axis_for_feature, axis_features = build_feature_matrix(runs) 167 168 n_runs, n_features = matrix.shape 169 n_components = min(10, n_features, n_runs) 170 171 print(f" {n_runs} runs, {n_features} features, {n_components} components") 172 173 # Standardize features (zero mean, unit variance) 174 scaler = StandardScaler() 175 matrix_scaled = scaler.fit_transform(matrix) 176 177 # Run PCA (3 components for visualization) 178 pca = PCA(n_components=n_components) 179 transformed = pca.fit_transform(matrix_scaled) 180 181 # Also run full PCA for scree plot 182 pca_full = PCA(n_components=min(n_features, n_runs)) 183 pca_full.fit(matrix_scaled) 184 scree = [round(float(v) * 100, 2) for v in pca_full.explained_variance_ratio_] 185 186 # Variance explained 187 variance_explained = [round(float(v) * 100, 2) for v in pca.explained_variance_ratio_] 188 print(f" Variance explained: {variance_explained}") 189 print(f" Cumulative (10 PCs): {round(sum(scree[:10]), 1)}%") 190 191 # Points (one per run) 192 pc_labels = [f"pc{i+1}" for i in range(n_components)] 193 points = [] 194 for i, run in enumerate(runs): 195 point = { 196 "run_id": run["run_id"], 197 "short_id": run["short_id"], 198 "model": run["meta"].get("model", "unknown"), 199 "score": round(run["score"], 4), 200 } 201 for j, label in enumerate(pc_labels): 202 point[label] = round(float(transformed[i, j]), 4) 203 # Add a config summary for tooltip 204 summary_parts = [] 205 for ax in ["effort", "prompt_style", "language", "max_budget", "strategy"]: 206 val = run["meta"].get(ax, "") 207 if val and val != "none": 208 summary_parts.append(f"{ax}={val}") 209 point["config_summary"] = ", ".join(summary_parts) 210 points.append(point) 211 212 # Raw loadings (one per feature per PC) 213 loadings = [] 214 for j, fname in enumerate(feature_names): 215 loading = { 216 "feature": fname, 217 "axis": axis_for_feature[j], 218 } 219 for k, label in enumerate(pc_labels): 220 loading[label] = round(float(pca.components_[k, j]), 4) 221 loadings.append(loading) 222 223 # Axis-level summaries: sum of absolute loadings per axis per PC 224 axis_importance: list[dict] = [] 225 for ax, fnames in axis_features.items(): 226 entry: dict = {"axis": ax} 227 for k, label in enumerate(pc_labels): 228 total = 0.0 229 for fname in fnames: 230 j = feature_names.index(fname) 231 total += abs(float(pca.components_[k, j])) 232 entry[label] = round(total, 4) 233 axis_importance.append(entry) 234 235 # Sort by total importance across all PCs 236 for entry in axis_importance: 237 entry["total"] = round(sum(entry[label] for label in pc_labels), 4) 238 axis_importance.sort(key=lambda x: x["total"], reverse=True) 239 240 return { 241 "n_runs": n_runs, 242 "n_features": n_features, 243 "n_components": n_components, 244 "variance_explained": variance_explained, 245 "scree": scree, 246 "points": points, 247 "loadings": loadings, 248 "axis_importance": axis_importance, 249 } 250 251 252 def main(): 253 print("PCA Analysis") 254 print("=" * 40) 255 256 runs = load_runs() 257 if len(runs) < 3: 258 print(f" Only {len(runs)} runs with scores found. Need at least 3 for PCA.") 259 sys.exit(1) 260 261 print(f" Loaded {len(runs)} runs with scores") 262 263 results = run_pca(runs) 264 265 OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) 266 OUTPUT_PATH.write_text(json.dumps(results, indent=2)) 267 print(f"\n Saved to {OUTPUT_PATH}") 268 print(" Done.") 269 270 271 if __name__ == "__main__": 272 main()