common.py (18932B)
1 # Copyright 2022 The Chromium Authors 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 """Common methods and variables used by Cr-Fuchsia testing infrastructure.""" 5 6 import ipaddress 7 import json 8 import logging 9 import os 10 import signal 11 import shutil 12 import socket 13 import subprocess 14 import sys 15 import time 16 17 from argparse import ArgumentParser 18 from typing import Iterable, List, Optional, Tuple 19 from dataclasses import dataclass 20 21 from compatible_utils import get_ssh_prefix, get_host_arch 22 23 24 def _find_src_root() -> str: 25 """Find the root of the src folder.""" 26 if os.environ.get('SRC_ROOT'): 27 return os.environ['SRC_ROOT'] 28 return os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, 29 os.pardir) 30 31 32 # The absolute path of the root folder to work on. It may not always be the 33 # src folder since there may not be source code at all, but it's expected to 34 # have folders like third_party/fuchsia-sdk in it. 35 DIR_SRC_ROOT = os.path.abspath(_find_src_root()) 36 37 38 def _find_fuchsia_images_root() -> str: 39 """Define the root of the fuchsia images.""" 40 if os.environ.get('FUCHSIA_IMAGES_ROOT'): 41 return os.environ['FUCHSIA_IMAGES_ROOT'] 42 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'images') 43 44 45 IMAGES_ROOT = os.path.abspath(_find_fuchsia_images_root()) 46 47 48 def _find_fuchsia_internal_images_root() -> str: 49 """Define the root of the fuchsia images.""" 50 if os.environ.get('FUCHSIA_INTERNAL_IMAGES_ROOT'): 51 return os.environ['FUCHSIA_INTERNAL_IMAGES_ROOT'] 52 return IMAGES_ROOT + '-internal' 53 54 55 INTERNAL_IMAGES_ROOT = os.path.abspath(_find_fuchsia_internal_images_root()) 56 57 REPO_ALIAS = 'fuchsia.com' 58 59 60 def _find_fuchsia_sdk_root() -> str: 61 """Define the root of the fuchsia sdk.""" 62 if os.environ.get('FUCHSIA_SDK_ROOT'): 63 return os.environ['FUCHSIA_SDK_ROOT'] 64 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'sdk') 65 66 67 SDK_ROOT = os.path.abspath(_find_fuchsia_sdk_root()) 68 69 70 def _find_fuchsia_gn_sdk_root() -> str: 71 """Define the root of the fuchsia sdk.""" 72 if os.environ.get('FUCHSIA_GN_SDK_ROOT'): 73 return os.environ['FUCHSIA_GN_SDK_ROOT'] 74 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-gn-sdk', 'src') 75 76 77 GN_SDK_ROOT = os.path.abspath(_find_fuchsia_gn_sdk_root()) 78 79 SDK_TOOLS_DIR = os.path.join(SDK_ROOT, 'tools', get_host_arch()) 80 _FFX_TOOL = os.path.join(SDK_TOOLS_DIR, 'ffx') 81 _FFX_ISOLATE_DIR = 'FFX_ISOLATE_DIR' 82 83 84 def set_ffx_isolate_dir(isolate_dir: str) -> None: 85 """Sets the global environment so the following ffx calls will have the 86 isolate dir being carried.""" 87 assert not has_ffx_isolate_dir(), 'The isolate dir is already set.' 88 os.environ[_FFX_ISOLATE_DIR] = isolate_dir 89 90 91 def get_ffx_isolate_dir() -> str: 92 """Returns the global environment of the isolate dir of ffx. This function 93 should only be called after set_ffx_isolate_dir.""" 94 return os.environ[_FFX_ISOLATE_DIR] 95 96 97 def has_ffx_isolate_dir() -> bool: 98 """Returns whether the isolate dir of ffx is set.""" 99 return _FFX_ISOLATE_DIR in os.environ 100 101 102 def get_hash_from_sdk(): 103 """Retrieve version info from the SDK.""" 104 105 version_file = os.path.join(SDK_ROOT, 'meta', 'manifest.json') 106 assert os.path.exists(version_file), \ 107 'Could not detect version file. Make sure the SDK is downloaded.' 108 with open(version_file, 'r') as f: 109 return json.load(f)['id'] 110 111 112 def get_host_tool_path(tool): 113 """Get a tool from the SDK.""" 114 115 return os.path.join(SDK_TOOLS_DIR, tool) 116 117 118 def get_host_os(): 119 """Get host operating system.""" 120 121 host_platform = sys.platform 122 if host_platform.startswith('linux'): 123 return 'linux' 124 if host_platform.startswith('darwin'): 125 return 'mac' 126 raise Exception('Unsupported host platform: %s' % host_platform) 127 128 129 def make_clean_directory(directory_name): 130 """If the directory exists, delete it and remake with no contents.""" 131 132 if os.path.exists(directory_name): 133 shutil.rmtree(directory_name) 134 os.makedirs(directory_name) 135 136 137 def _get_daemon_status(): 138 """Determines daemon status via `ffx daemon socket`. 139 140 Returns: 141 dict of status of the socket. Status will have a key Running or 142 NotRunning to indicate if the daemon is running. 143 """ 144 status = json.loads( 145 run_ffx_command(cmd=('daemon', 'socket'), 146 capture_output=True, 147 json_out=True).stdout.strip()) 148 return status.get('pid', {}).get('status', {'NotRunning': True}) 149 150 151 def is_daemon_running() -> bool: 152 """Returns if the daemon is running.""" 153 return 'Running' in _get_daemon_status() 154 155 156 def _wait_for_daemon(start=True, timeout_seconds=100): 157 """Waits for daemon to reach desired state in a polling loop. 158 159 Sleeps for 5s between polls. 160 161 Args: 162 start: bool. Indicates to wait for daemon to start up. If False, 163 indicates waiting for daemon to die. 164 timeout_seconds: int. Number of seconds to wait for the daemon to reach 165 the desired status. 166 Raises: 167 TimeoutError: if the daemon does not reach the desired state in time. 168 """ 169 wanted_status = 'start' if start else 'stop' 170 sleep_period_seconds = 5 171 attempts = int(timeout_seconds / sleep_period_seconds) 172 for i in range(attempts): 173 if is_daemon_running() == start: 174 return 175 if i != attempts: 176 logging.info('Waiting for daemon to %s...', wanted_status) 177 time.sleep(sleep_period_seconds) 178 179 raise TimeoutError(f'Daemon did not {wanted_status} in time.') 180 181 182 # The following two functions are the temporary work around before 183 # https://fxbug.dev/92296 and https://fxbug.dev/125873 are being fixed. 184 def start_ffx_daemon(): 185 """Starts the ffx daemon by using doctor --restart-daemon since daemon start 186 blocks the current shell. 187 188 Note, doctor --restart-daemon usually fails since the timeout in ffx is 189 short and won't be sufficient to wait for the daemon to really start. 190 191 Also, doctor --restart-daemon always restarts the daemon, so this function 192 should be used with caution unless it's really needed to "restart" the 193 daemon by explicitly calling stop daemon first. 194 """ 195 assert not is_daemon_running(), "Call stop_ffx_daemon first." 196 run_ffx_command(cmd=('doctor', '--restart-daemon'), check=False) 197 _wait_for_daemon(start=True) 198 199 200 def stop_ffx_daemon(): 201 """Stops the ffx daemon""" 202 run_ffx_command(cmd=('daemon', 'stop', '-t', '10000')) 203 _wait_for_daemon(start=False) 204 205 206 def run_ffx_command(check: bool = True, 207 capture_output: Optional[bool] = None, 208 timeout: Optional[int] = None, 209 **kwargs) -> subprocess.CompletedProcess: 210 """Runs `ffx` with the given arguments, waiting for it to exit. 211 212 ** 213 The arguments below are named after |subprocess.run| arguments. They are 214 overloaded to avoid them from being forwarded to |subprocess.Popen|. 215 ** 216 See run_continuous_ffx_command for additional arguments. 217 Args: 218 check: If True, CalledProcessError is raised if ffx returns a non-zero 219 exit code. 220 capture_output: Whether to capture both stdout/stderr. 221 timeout: Optional timeout (in seconds). Throws TimeoutError if process 222 does not complete in timeout period. 223 Returns: 224 A CompletedProcess instance 225 Raises: 226 CalledProcessError if |check| is true. 227 """ 228 if capture_output: 229 kwargs['stdout'] = subprocess.PIPE 230 kwargs['stderr'] = subprocess.PIPE 231 proc = None 232 try: 233 proc = run_continuous_ffx_command(**kwargs) 234 stdout, stderr = proc.communicate(input=kwargs.get('stdin'), 235 timeout=timeout) 236 completed_proc = subprocess.CompletedProcess( 237 args=proc.args, 238 returncode=proc.returncode, 239 stdout=stdout, 240 stderr=stderr) 241 if check: 242 completed_proc.check_returncode() 243 return completed_proc 244 except subprocess.CalledProcessError as cpe: 245 logging.error('%s %s failed with returncode %s.', 246 os.path.relpath(_FFX_TOOL), 247 subprocess.list2cmdline(proc.args[1:]), cpe.returncode) 248 if cpe.stdout: 249 logging.error('stdout of the command: %s', cpe.stdout) 250 if cpe.stderr: 251 logging.error('stderr or the command: %s', cpe.stderr) 252 raise 253 254 255 def run_continuous_ffx_command(cmd: Iterable[str], 256 target_id: Optional[str] = None, 257 configs: Optional[List[str]] = None, 258 json_out: bool = False, 259 encoding: Optional[str] = 'utf-8', 260 **kwargs) -> subprocess.Popen: 261 """Runs `ffx` with the given arguments, returning immediately. 262 263 Args: 264 cmd: A sequence of arguments to ffx. 265 target_id: Whether to execute the command for a specific target. The 266 target_id could be in the form of a nodename or an address. 267 configs: A list of configs to be applied to the current command. 268 json_out: Have command output returned as JSON. Must be parsed by 269 caller. 270 encoding: Optional, desired encoding for output/stderr pipes. 271 Returns: 272 A subprocess.Popen instance 273 """ 274 275 ffx_cmd = [_FFX_TOOL] 276 if json_out: 277 ffx_cmd.extend(('--machine', 'json')) 278 if target_id: 279 ffx_cmd.extend(('--target', target_id)) 280 if configs: 281 for config in configs: 282 ffx_cmd.extend(('--config', config)) 283 ffx_cmd.extend(cmd) 284 285 return subprocess.Popen(ffx_cmd, encoding=encoding, **kwargs) 286 287 288 def read_package_paths(out_dir: str, pkg_name: str) -> List[str]: 289 """ 290 Returns: 291 A list of the absolute path to all FAR files the package depends on. 292 """ 293 with open( 294 os.path.join(DIR_SRC_ROOT, out_dir, 'gen', 'package_metadata', 295 f'{pkg_name}.meta')) as meta_file: 296 data = json.load(meta_file) 297 packages = [] 298 for package in data['packages']: 299 packages.append(os.path.join(DIR_SRC_ROOT, out_dir, package)) 300 return packages 301 302 303 def register_common_args(parser: ArgumentParser) -> None: 304 """Register commonly used arguments.""" 305 common_args = parser.add_argument_group('common', 'common arguments') 306 common_args.add_argument( 307 '--out-dir', 308 '-C', 309 type=os.path.realpath, 310 help='Path to the directory in which build files are located. ') 311 312 313 def register_device_args(parser: ArgumentParser) -> None: 314 """Register device arguments.""" 315 device_args = parser.add_argument_group('device', 'device arguments') 316 device_args.add_argument('--target-id', 317 default=os.environ.get('FUCHSIA_NODENAME'), 318 help=('Specify the target device. This could be ' 319 'a node-name (e.g. fuchsia-emulator) or an ' 320 'an ip address along with an optional port ' 321 '(e.g. [fe80::e1c4:fd22:5ee5:878e]:22222, ' 322 '1.2.3.4, 1.2.3.4:33333). If unspecified, ' 323 'the default target in ffx will be used.')) 324 325 326 def register_log_args(parser: ArgumentParser) -> None: 327 """Register commonly used arguments.""" 328 329 log_args = parser.add_argument_group('logging', 'logging arguments') 330 log_args.add_argument('--logs-dir', 331 type=os.path.realpath, 332 help=('Directory to write logs to.')) 333 334 335 def get_component_uri(package: str) -> str: 336 """Retrieve the uri for a package.""" 337 # If the input is a full package already, do nothing 338 if package.startswith('fuchsia-pkg://'): 339 return package 340 return f'fuchsia-pkg://{REPO_ALIAS}/{package}#meta/{package}.cm' 341 342 343 def ssh_run(cmd: List[str], 344 target_id: Optional[str], 345 check=True, 346 **kwargs) -> subprocess.CompletedProcess: 347 """Runs a command on the |target_id| via ssh.""" 348 ssh_prefix = get_ssh_prefix(get_ssh_address(target_id)) 349 return subprocess.run(ssh_prefix + ['--'] + cmd, check=check, **kwargs) 350 351 352 def resolve_packages(packages: List[str], target_id: Optional[str]) -> None: 353 """Ensure that all |packages| are installed on a device.""" 354 355 # A temporary solution to avoid cycle dependency. The DIR_SRC_ROOT should be 356 # moved away from common.py. 357 # pylint: disable=cyclic-import, import-outside-toplevel 358 import monitors 359 360 with monitors.time_consumption('pkgctl', 'gc'): 361 ssh_run(['pkgctl', 'gc'], target_id, check=False) 362 363 def _retry_resolve(package) -> None: 364 """Helper function for retrying a subprocess.run command.""" 365 366 cmd = ['pkgctl', 'resolve', 367 'fuchsia-pkg://%s/%s' % (REPO_ALIAS, package)] 368 retry_counter = monitors.count('pkgctl', 'resolve', package, 'retry') 369 for _ in range(4): 370 proc = ssh_run(cmd, target_id=target_id, check=False) 371 if proc.returncode == 0: 372 return 373 time.sleep(3) 374 retry_counter.record(1) 375 ssh_run(cmd, target_id=target_id, check=True) 376 377 for package in packages: 378 with monitors.time_consumption('pkgctl', 'resolve', package): 379 _retry_resolve(package) 380 381 382 def get_ip_address(target_id: Optional[str], ipv4_only: bool = False): 383 """Determines address of the given target; returns the value from 384 ipaddress.ip_address.""" 385 return ipaddress.ip_address(get_ssh_address(target_id, ipv4_only)[0]) 386 387 388 def get_ssh_address(target_id: Optional[str], 389 ipv4_only: bool = False) -> Tuple[str, int]: 390 """Determines SSH address for given target.""" 391 cmd = ['target', 'list'] 392 if ipv4_only: 393 cmd.append('--no-ipv6') 394 if target_id: 395 # target list does not respect -t / --target flag. 396 cmd.append(target_id) 397 target = json.loads( 398 run_ffx_command(cmd=cmd, json_out=True, 399 capture_output=True).stdout.strip()) 400 addr = target[0]['addresses'][0] 401 if 'Ip' in addr: 402 addr = addr['Ip'] 403 ssh_port = int(addr['ssh_port']) 404 if ssh_port == 0: 405 # Returning an unset ssh_port means the default port 22. 406 ssh_port = 22 407 return (addr['ip'], ssh_port) 408 409 410 def find_in_dir(target_name: str, parent_dir: str) -> Optional[str]: 411 """Finds path in SDK. 412 413 Args: 414 target_name: Name of target to find, as a string. 415 parent_dir: Directory to start search in. 416 417 Returns: 418 Full path to the target, None if not found. 419 """ 420 # Doesn't make sense to look for a full path. Only extract the basename. 421 target_name = os.path.basename(target_name) 422 for root, dirs, _ in os.walk(parent_dir): 423 if target_name in dirs: 424 return os.path.abspath(os.path.join(root, target_name)) 425 426 return None 427 428 429 def find_image_in_sdk(product_name: str) -> Optional[str]: 430 """Finds image dir in SDK for product given. 431 432 Args: 433 product_name: Name of product's image directory to find. 434 435 Returns: 436 Full path to the target, None if not found. 437 """ 438 top_image_dir = os.path.join(SDK_ROOT, os.pardir, 'images') 439 path = find_in_dir(product_name, parent_dir=top_image_dir) 440 if path: 441 return find_in_dir('images', parent_dir=path) 442 return path 443 444 445 def catch_sigterm() -> None: 446 """Catches the kill signal and allows the process to exit cleanly.""" 447 def _sigterm_handler(*_): 448 sys.exit(0) 449 450 signal.signal(signal.SIGTERM, _sigterm_handler) 451 452 453 def wait_for_sigterm(extra_msg: str = '') -> None: 454 """ 455 Spin-wait for either ctrl+c or sigterm. Caller can use try-finally 456 statement to perform extra cleanup. 457 458 Args: 459 extra_msg: The extra message to be logged. 460 """ 461 try: 462 while True: 463 # We do expect receiving either ctrl+c or sigterm, so this line 464 # literally means sleep forever. 465 time.sleep(10000) 466 except KeyboardInterrupt: 467 logging.info('Ctrl-C received; %s', extra_msg) 468 except SystemExit: 469 logging.info('SIGTERM received; %s', extra_msg) 470 471 472 @dataclass 473 class BuildInfo: 474 """A structure replica of the output of build section in `ffx target show`. 475 """ 476 version: Optional[str] = None 477 product: Optional[str] = None 478 board: Optional[str] = None 479 commit: Optional[str] = None 480 481 482 def get_build_info(target: Optional[str] = None) -> Optional[BuildInfo]: 483 """Retrieves build info from the device. 484 485 Returns: 486 A BuildInfo struct, or None if anything goes wrong. 487 Any field in BuildInfo can be None to indicate the missing of the field. 488 """ 489 info_cmd = run_ffx_command(cmd=('--machine', 'json', 'target', 'show'), 490 target_id=target, 491 capture_output=True, 492 check=False) 493 # If the information was not retrieved, return empty strings to indicate 494 # unknown system info. 495 if info_cmd.returncode != 0: 496 logging.error('ffx target show returns %d', info_cmd.returncode) 497 return None 498 try: 499 info_json = json.loads(info_cmd.stdout.strip()) 500 except json.decoder.JSONDecodeError as error: 501 logging.error('Unexpected json string: %s, exception: %s', 502 info_cmd.stdout, error) 503 return None 504 if isinstance(info_json, dict) and 'build' in info_json and isinstance( 505 info_json['build'], dict): 506 return BuildInfo(**info_json['build']) 507 return None 508 509 510 def get_system_info(target: Optional[str] = None) -> Tuple[str, str]: 511 """Retrieves installed OS version from the device. 512 513 Returns: 514 Tuple of strings, containing {product, version number), or a pair of 515 empty strings to indicate an error. 516 """ 517 build_info = get_build_info(target) 518 if not build_info: 519 return ('', '') 520 521 return (build_info.product or '', build_info.version or '') 522 523 524 def get_free_local_port() -> int: 525 """Returns an ipv4 port available locally. It does not reserve the port and 526 may cause race condition. Copied from catapult 527 https://crsrc.org/c/third_party/catapult/telemetry/telemetry/core/util.py;drc=e3f9ae73db5135ad998108113af7ef82a47efc51;l=61""" 528 # AF_INET restricts port to IPv4 addresses. 529 # SOCK_STREAM means that it is a TCP socket. 530 tmp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 531 # Setting SOL_SOCKET + SO_REUSEADDR to 1 allows the reuse of local 532 # addresses, this is so sockets do not fail to bind for being in the 533 # CLOSE_WAIT state. 534 tmp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 535 tmp.bind(('', 0)) 536 port = tmp.getsockname()[1] 537 tmp.close() 538 539 return port