profile-worker.js (12218B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 const { parentPort, workerData } = require("worker_threads"); 6 const fs = require("fs"); 7 const path = require("path"); 8 const zlib = require("zlib"); 9 10 // Normalize failure messages to remove task-specific and time-specific information 11 function normalizeMessage(message) { 12 return message 13 ?.replace(/task_\d+/g, "task_id") 14 .replace(/\nRejection date: [^\n]+/g, ""); 15 } 16 17 // Extract parallel execution time ranges from markers 18 function extractParallelRanges(markers) { 19 const parallelRanges = []; 20 21 for (let i = 0; i < markers.length; i++) { 22 const data = markers.data[i]; 23 // Look for markers with type: "Text" and text: "parallel" 24 if (data?.type === "Text" && data.text === "parallel") { 25 parallelRanges.push({ 26 start: markers.startTime[i], 27 end: markers.endTime[i], 28 }); 29 } 30 } 31 32 return parallelRanges; 33 } 34 35 // Check if a test time overlaps with any parallel execution range 36 function isInParallelRange(testStart, testEnd, parallelRanges) { 37 for (const range of parallelRanges) { 38 // Check if test overlaps with parallel range 39 if (testStart < range.end && testEnd > range.start) { 40 return true; 41 } 42 } 43 return false; 44 } 45 46 // Extract resource usage information from profile 47 function extractResourceUsage(profile) { 48 if (!profile || !profile.threads || !profile.threads[0]) { 49 return null; 50 } 51 52 const thread = profile.threads[0]; 53 const { markers } = thread; 54 55 if (!markers || !markers.data) { 56 return null; 57 } 58 59 // Extract machine info from profile metadata 60 // Convert memory to GB with 1 decimal place to avoid grouping issues from tiny variations 61 const machineInfo = { 62 logicalCPUs: profile.meta?.logicalCPUs || null, 63 physicalCPUs: profile.meta?.physicalCPUs || null, 64 mainMemory: profile.meta?.mainMemory 65 ? parseFloat((profile.meta.mainMemory / (1024 * 1024 * 1024)).toFixed(1)) 66 : null, 67 }; 68 69 let maxMemory = 0; 70 let idleTime = 0; 71 let singleCoreTime = 0; 72 // CPU buckets: [0-10%, 10-20%, 20-30%, ..., 90-100%] 73 const cpuBuckets = new Array(10).fill(0); 74 75 // Calculate thresholds based on core count 76 const oneCorePct = machineInfo.logicalCPUs 77 ? 100 / machineInfo.logicalCPUs 78 : 12.5; 79 const idleThreshold = oneCorePct / 2; 80 // Single-core range: 0.75 to 1.25 cores (to account for slight variations) 81 const singleCoreMin = oneCorePct * 0.75; 82 const singleCoreMax = oneCorePct * 1.25; 83 84 // Process markers to gather resource usage 85 for (let i = 0; i < markers.length; i++) { 86 const data = markers.data[i]; 87 if (!data) { 88 continue; 89 } 90 91 const duration = markers.endTime[i] - markers.startTime[i]; 92 93 if (data.type === "Mem") { 94 if (data.used > maxMemory) { 95 maxMemory = data.used; 96 } 97 } else if (data.type === "CPU") { 98 // Parse CPU percentage (e.g., "21.4%" -> 21.4) 99 const cpuPercent = parseFloat(data.cpuPercent); 100 if (isNaN(cpuPercent)) { 101 continue; 102 } 103 104 if (cpuPercent < idleThreshold) { 105 idleTime += duration; 106 } 107 108 // Check if it's in the single-core range 109 if (cpuPercent >= singleCoreMin && cpuPercent <= singleCoreMax) { 110 singleCoreTime += duration; 111 } 112 113 // Compute bucket index: 0-10% -> bucket 0, 10-20% -> bucket 1, etc. 114 const bucketIndex = Math.min(Math.floor(cpuPercent / 10), 9); 115 cpuBuckets[bucketIndex] += duration; 116 } 117 } 118 119 return { 120 machineInfo, 121 maxMemory, 122 idleTime, 123 singleCoreTime, 124 cpuBuckets, 125 }; 126 } 127 128 // Extract test timings from profile 129 // eslint-disable-next-line complexity 130 function extractTestTimings(profile) { 131 if (!profile || !profile.threads || !profile.threads[0]) { 132 return []; 133 } 134 135 const thread = profile.threads[0]; 136 const { markers, stringArray } = thread; 137 138 if (!markers || !markers.data || !markers.name || !stringArray) { 139 return []; 140 } 141 142 // First, extract parallel execution ranges 143 const parallelRanges = extractParallelRanges(markers); 144 145 // Extract crash markers for later matching with CRASH status tests 146 const crashMarkers = []; 147 for (let i = 0; i < markers.length; i++) { 148 const data = markers.data[i]; 149 if (data?.type !== "Crash" || !data.test) { 150 continue; 151 } 152 crashMarkers.push({ 153 testPath: data.test, 154 startTime: markers.startTime[i], 155 signature: data.signature || null, 156 minidump: data.minidump || null, 157 }); 158 } 159 160 // Extract TestStatus markers (FAIL, ERROR) for failure messages 161 const failStringId = stringArray.indexOf("FAIL"); 162 const errorStringId = stringArray.indexOf("ERROR"); 163 const testStatusMarkers = []; 164 165 for (let i = 0; i < markers.length; i++) { 166 const nameId = markers.name[i]; 167 if (nameId !== failStringId && nameId !== errorStringId) { 168 continue; 169 } 170 const data = markers.data[i]; 171 if (!data || data.type !== "TestStatus" || !data.test) { 172 continue; 173 } 174 175 testStatusMarkers.push({ 176 test: data.test, 177 nameId, 178 time: markers.startTime[i], 179 message: normalizeMessage(data.message), 180 }); 181 } 182 183 // Sort TestStatus markers by test and then time for efficient lookup 184 testStatusMarkers.sort( 185 (a, b) => a.test.localeCompare(b.test) || a.time - b.time 186 ); 187 188 const testStringId = stringArray.indexOf("test"); 189 const timings = []; 190 191 for (let i = 0; i < markers.length; i++) { 192 if (markers.name[i] !== testStringId) { 193 continue; 194 } 195 196 const data = markers.data[i]; 197 if (!data) { 198 continue; 199 } 200 201 let testPath = null; 202 let status = "UNKNOWN"; 203 let message = null; 204 205 // Handle both structured and plain text logs 206 if (data.type === "Test") { 207 // Structured log format 208 const fullTestId = data.test || data.name; 209 testPath = fullTestId; 210 status = data.status || "UNKNOWN"; 211 // Normalize line breaks in message (convert \r\n to \n) and apply normalizations 212 message = normalizeMessage( 213 data.message ? data.message.replace(/\r\n/g, "\n") : null 214 ); 215 216 // Check if this is an expected failure (FAIL status but green color) 217 if (status === "FAIL" && data.color === "green") { 218 status = "EXPECTED-FAIL"; 219 } 220 // Add execution context suffix to timeout, fail, and pass statuses 221 else if ( 222 ["TIMEOUT", "FAIL", "PASS"].includes(status) && 223 parallelRanges.length 224 ) { 225 status += isInParallelRange( 226 markers.startTime[i], 227 markers.endTime[i], 228 parallelRanges 229 ) 230 ? "-PARALLEL" 231 : "-SEQUENTIAL"; 232 } 233 // Keep other statuses as-is 234 235 // For failure statuses, look up the message from TestStatus markers 236 if (status.startsWith("FAIL")) { 237 const testStartTime = markers.startTime[i]; 238 const statusMarker = testStatusMarkers.find( 239 m => m.test === fullTestId && m.time >= testStartTime 240 ); 241 if (statusMarker && statusMarker.message) { 242 message = statusMarker.message; 243 } 244 } 245 246 // Extract the actual test file path from the test field 247 // Format: "xpcshell-parent-process.toml:dom/indexedDB/test/unit/test_fileListUpgrade.js" 248 if (testPath && testPath.includes(":")) { 249 testPath = testPath.split(":")[1]; 250 } 251 } else if (data.type === "Text") { 252 // Plain text log format 253 testPath = data.text; 254 255 // Skip text markers like "replaying full log for ..." 256 if (testPath?.startsWith("replaying full log for ")) { 257 continue; 258 } 259 260 // We don't have status information in markers from plain text logs 261 status = "UNKNOWN"; 262 } else { 263 continue; 264 } 265 266 if (!testPath || !testPath.endsWith(".js")) { 267 continue; 268 } 269 270 const testStartTime = markers.startTime[i]; 271 const testEndTime = markers.endTime[i]; 272 273 const timing = { 274 path: testPath, 275 duration: testEndTime - testStartTime, 276 status, 277 timestamp: profile.meta.startTime + testStartTime, 278 }; 279 if (message) { 280 timing.message = message; 281 } 282 283 // For CRASH status, find matching crash marker within the test's time range 284 if (status === "CRASH") { 285 const matchingCrash = crashMarkers.find( 286 crash => 287 crash.testPath === data.test && 288 crash.startTime >= testStartTime && 289 crash.startTime <= testEndTime 290 ); 291 if (matchingCrash) { 292 if (matchingCrash.signature) { 293 timing.crashSignature = matchingCrash.signature; 294 } 295 if (matchingCrash.minidump) { 296 timing.minidump = matchingCrash.minidump; 297 } 298 } 299 } 300 301 timings.push(timing); 302 } 303 304 return timings; 305 } 306 307 // Fetch resource profile from TaskCluster with local caching 308 async function fetchResourceProfile(taskId, retryId = 0) { 309 const cacheFileGz = path.join( 310 workerData.profileCacheDir, 311 `${taskId}-${retryId}.json.gz` 312 ); 313 314 // Check if we have a cached gzipped version 315 if (fs.existsSync(cacheFileGz)) { 316 try { 317 const compressedData = fs.readFileSync(cacheFileGz); 318 const decompressedData = zlib.gunzipSync(compressedData); 319 return JSON.parse(decompressedData.toString("utf-8")); 320 } catch (error) { 321 console.warn( 322 `Error reading cached gzipped profile ${taskId}: ${error.message}` 323 ); 324 // Continue to fetch from network 325 } 326 } 327 328 const url = `${workerData.taskclusterBaseUrl}/api/queue/v1/task/${taskId}/runs/${retryId}/artifacts/public/test_info/profile_resource-usage.json`; 329 330 try { 331 const response = await fetch(url); 332 if (!response.ok) { 333 return null; 334 } 335 336 const profile = await response.json(); 337 338 // Cache the profile for future use (gzipped) 339 try { 340 const compressed = zlib.gzipSync(JSON.stringify(profile)); 341 fs.writeFileSync(cacheFileGz, compressed); 342 } catch (error) { 343 console.warn(`Error caching profile ${taskId}: ${error.message}`); 344 } 345 346 return profile; 347 } catch (error) { 348 console.error(`Error fetching profile for task ${taskId}:`, error.message); 349 return null; 350 } 351 } 352 353 // Process a single job to extract test timings 354 async function processJob(job) { 355 const taskId = job.task_id; 356 const retryId = job.retry_id || 0; 357 const jobName = job.name; 358 359 if (!taskId) { 360 return null; 361 } 362 363 // Processing job silently to avoid mixed output with main thread 364 365 const profile = await fetchResourceProfile(taskId, retryId); 366 if (!profile) { 367 return null; 368 } 369 370 const timings = extractTestTimings(profile); 371 if (timings.length === 0) { 372 return null; 373 } 374 375 const resourceUsage = extractResourceUsage(profile); 376 377 // Extract commit ID from profile.meta.sourceURL 378 // Format: "https://hg.mozilla.org/integration/autoland/rev/f37a6863f87aeeb870b16223045ea7614b1ba0a7" 379 let commitId = null; 380 if (profile.meta.sourceURL) { 381 const match = profile.meta.sourceURL.match(/\/rev\/([a-f0-9]+)$/i); 382 if (match) { 383 commitId = match[1]; 384 } 385 } 386 387 // Convert start_time to timestamp in seconds if it's a string 388 const startTime = 389 typeof job.start_time === "string" 390 ? Math.floor(new Date(job.start_time).getTime() / 1000) 391 : job.start_time; 392 393 return { 394 jobName, 395 taskId, 396 retryId, 397 repository: job.repository, 398 startTime, 399 timings, 400 resourceUsage, 401 commitId, 402 }; 403 } 404 405 // Main worker function 406 async function main() { 407 try { 408 const results = []; 409 410 // Signal worker is ready for jobs 411 parentPort.postMessage({ type: "ready" }); 412 413 // Listen for job assignments 414 parentPort.on("message", async message => { 415 if (message.type === "job") { 416 const result = await processJob(message.job); 417 if (result) { 418 results.push(result); 419 } 420 // Request next job 421 parentPort.postMessage({ type: "jobComplete", result }); 422 } else if (message.type === "shutdown") { 423 // Send final results and exit 424 parentPort.postMessage({ type: "finished", results }); 425 } 426 }); 427 } catch (error) { 428 parentPort.postMessage({ type: "error", error: error.message }); 429 } 430 } 431 432 main();