sonarqube-scan.py (5857B)
1 #!/usr/bin/env python3 2 """SonarQube code analysis for generated Tetris implementations. 3 4 Runs sonar-scanner against the workspace and pulls metrics via API. 5 Requires SonarQube running at localhost:9000. 6 7 Usage: python3 sonarqube-scan.py <workspace_path> <project_key> 8 Output: JSON to stdout 9 """ 10 11 import json 12 import subprocess 13 import sys 14 import time 15 import urllib.request 16 import urllib.error 17 from pathlib import Path 18 19 20 SONAR_URL = "http://localhost:9000" 21 SONAR_TOKEN_FILE = Path.home() / ".sonarqube-token" 22 23 24 def get_token() -> str: 25 if SONAR_TOKEN_FILE.exists(): 26 return SONAR_TOKEN_FILE.read_text().strip() 27 return "" 28 29 30 def scan(workspace: Path, project_key: str, token: str) -> bool: 31 """Run sonar-scanner against workspace. Returns True on success.""" 32 cmd = [ 33 "sonar-scanner", 34 f"-Dsonar.projectKey={project_key}", 35 "-Dsonar.sources=.", 36 f"-Dsonar.host.url={SONAR_URL}", 37 f"-Dsonar.login={token}", 38 "-Dsonar.exclusions=**/node_modules/**,**/dist/**,**/build/**,**/package-lock.json,**/report/**,.scannerwork/**", 39 "-Dsonar.scm.disabled=true", 40 ] 41 result = subprocess.run( 42 cmd, cwd=workspace, capture_output=True, text=True, timeout=60 43 ) 44 return "EXECUTION SUCCESS" in result.stdout 45 46 47 def wait_for_analysis(project_key: str, token: str, timeout: int = 30) -> bool: 48 """Wait for SonarQube to finish processing.""" 49 import base64 50 auth = base64.b64encode(f"{token}:".encode()).decode() 51 headers = {"Authorization": f"Basic {auth}"} 52 53 for _ in range(timeout): 54 try: 55 req = urllib.request.Request( 56 f"{SONAR_URL}/api/ce/component?component={project_key}", 57 headers=headers, 58 ) 59 resp = urllib.request.urlopen(req, timeout=5) 60 data = json.loads(resp.read()) 61 tasks = data.get("queue", []) + [data.get("current", {})] 62 pending = any( 63 t.get("status") in ("PENDING", "IN_PROGRESS") 64 for t in tasks if t 65 ) 66 if not pending: 67 return True 68 except Exception: 69 pass 70 time.sleep(1) 71 return False 72 73 74 def get_metrics(project_key: str, token: str) -> dict: 75 """Pull metrics from SonarQube API.""" 76 import base64 77 auth = base64.b64encode(f"{token}:".encode()).decode() 78 headers = {"Authorization": f"Basic {auth}"} 79 80 metrics = [ 81 "bugs", "vulnerabilities", "code_smells", 82 "cognitive_complexity", "duplicated_lines_density", 83 "ncloc", "sqale_rating", "reliability_rating", 84 "security_rating", "sqale_index", 85 ] 86 87 try: 88 req = urllib.request.Request( 89 f"{SONAR_URL}/api/measures/component?component={project_key}&metricKeys={','.join(metrics)}", 90 headers=headers, 91 ) 92 resp = urllib.request.urlopen(req, timeout=10) 93 data = json.loads(resp.read()) 94 measures = data.get("component", {}).get("measures", []) 95 return {m["metric"]: float(m["value"]) for m in measures} 96 except Exception as e: 97 return {"error": str(e)} 98 99 100 def compute_score(metrics: dict) -> float: 101 """Compute a 0-1 score from SonarQube metrics.""" 102 if "error" in metrics: 103 return 0.0 104 105 score = 100.0 106 107 # Bugs: -15 each, max -30 108 bugs = metrics.get("bugs", 0) 109 score -= min(bugs * 15, 30) 110 111 # Code smells: -3 each, max -20 112 smells = metrics.get("code_smells", 0) 113 score -= min(smells * 3, 20) 114 115 # Vulnerabilities: -20 each, max -40 116 vulns = metrics.get("vulnerabilities", 0) 117 score -= min(vulns * 20, 40) 118 119 # Cognitive complexity: penalty above 50 120 complexity = metrics.get("cognitive_complexity", 0) 121 if complexity > 100: 122 score -= 15 123 elif complexity > 50: 124 score -= 5 125 126 # Duplication: penalty above 5% 127 duplication = metrics.get("duplicated_lines_density", 0) 128 if duplication > 10: 129 score -= 10 130 elif duplication > 5: 131 score -= 5 132 133 return max(0.0, min(100.0, score)) / 100.0 134 135 136 def main(): 137 workspace = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".") 138 project_key = sys.argv[2] if len(sys.argv) > 2 else "tetris-eval" 139 140 token = get_token() 141 if not token: 142 print(json.dumps({"error": "no SonarQube token found", "score": 0})) 143 return 144 145 # Check if SonarQube is running 146 try: 147 urllib.request.urlopen(f"{SONAR_URL}/api/system/status", timeout=3) 148 except Exception: 149 print(json.dumps({"error": "SonarQube not running at localhost:9000", "score": 0})) 150 return 151 152 # Run scan 153 if not scan(workspace, project_key, token): 154 print(json.dumps({"error": "sonar-scanner failed", "score": 0})) 155 return 156 157 # Wait for processing 158 wait_for_analysis(project_key, token) 159 160 # Get metrics 161 metrics = get_metrics(project_key, token) 162 score = compute_score(metrics) 163 164 # Rating labels (SonarQube uses 1-5 where 1=A, 5=E) 165 rating_labels = {1.0: "A", 2.0: "B", 3.0: "C", 4.0: "D", 5.0: "E"} 166 167 result = { 168 "bugs": int(metrics.get("bugs", 0)), 169 "vulnerabilities": int(metrics.get("vulnerabilities", 0)), 170 "code_smells": int(metrics.get("code_smells", 0)), 171 "cognitive_complexity": int(metrics.get("cognitive_complexity", 0)), 172 "lines_of_code": int(metrics.get("ncloc", 0)), 173 "duplication_pct": metrics.get("duplicated_lines_density", 0), 174 "tech_debt_minutes": int(metrics.get("sqale_index", 0)), 175 "maintainability": rating_labels.get(metrics.get("sqale_rating", 0), "?"), 176 "reliability": rating_labels.get(metrics.get("reliability_rating", 0), "?"), 177 "security": rating_labels.get(metrics.get("security_rating", 0), "?"), 178 "score": round(score, 2), 179 } 180 181 print(json.dumps(result, indent=2)) 182 183 184 if __name__ == "__main__": 185 main()