tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

fast_local_dev_server.py (37130B)


      1 #!/usr/bin/env python3
      2 # Copyright 2021 The Chromium Authors
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 """Creates an server to offload non-critical-path GN targets."""
      6 
      7 from __future__ import annotations
      8 
      9 import argparse
     10 import collections
     11 import contextlib
     12 import dataclasses
     13 import datetime
     14 import os
     15 import pathlib
     16 import re
     17 import signal
     18 import shlex
     19 import shutil
     20 import socket
     21 import subprocess
     22 import sys
     23 import threading
     24 import traceback
     25 import time
     26 from typing import Callable, Dict, List, Optional, Tuple, IO
     27 
     28 sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
     29 from util import server_utils
     30 
     31 _SOCKET_TIMEOUT = 60  # seconds
     32 
     33 _LOGFILE_NAME = 'buildserver.log'
     34 _MAX_LOGFILES = 6
     35 
     36 FIRST_LOG_LINE = """\
     37 #### Start of log for build: {build_id}
     38 #### CWD: {outdir}
     39 """
     40 BUILD_ID_RE = re.compile(r'^#### Start of log for build: (?P<build_id>.+)')
     41 
     42 
     43 def server_log(msg: str):
     44  if OptionsManager.is_quiet():
     45    return
     46  # Ensure we start our message on a new line.
     47  print('\n' + msg)
     48 
     49 
     50 def print_status(prefix: str, msg: str):
     51  # No need to also output to the terminal if quiet.
     52  if OptionsManager.is_quiet():
     53    return
     54  # Shrink the message (leaving a 2-char prefix and use the rest of the room
     55  # for the suffix) according to terminal size so it is always one line.
     56  width = shutil.get_terminal_size().columns
     57  max_msg_width = width - len(prefix)
     58  if len(msg) > max_msg_width:
     59    length_to_show = max_msg_width - 5  # Account for ellipsis and header.
     60    msg = f'{msg[:2]}...{msg[-length_to_show:]}'
     61  # \r to return the carriage to the beginning of line.
     62  # \033[K to replace the normal \n to erase until the end of the line.
     63  # Avoid the default line ending so the next \r overwrites the same line just
     64  #     like ninja's output.
     65  print(f'\r{prefix}{msg}\033[K', end='', flush=True)
     66 
     67 
     68 def _exception_hook(exctype: type, exc: Exception, tb):
     69  # Let KeyboardInterrupt through.
     70  if issubclass(exctype, KeyboardInterrupt):
     71    sys.__excepthook__(exctype, exc, tb)
     72    return
     73  stacktrace = ''.join(traceback.format_exception(exctype, exc, tb))
     74  stacktrace_lines = [f'\n{line}' for line in stacktrace.splitlines()]
     75  # Output uncaught exceptions to all live terminals
     76  # Extra newline since siso's output often erases the current line.
     77  BuildManager.broadcast(''.join(stacktrace_lines) + '\n')
     78  # Cancel all pending tasks cleanly (i.e. delete stamp files if necessary).
     79  TaskManager.deactivate()
     80  # Reset all remote terminal titles.
     81  BuildManager.update_remote_titles('')
     82 
     83 
     84 # Stores global options so as to not keep passing along and storing options
     85 # everywhere.
     86 class OptionsManager:
     87  _options = None
     88 
     89  @classmethod
     90  def set_options(cls, options):
     91    cls._options = options
     92 
     93  @classmethod
     94  def is_quiet(cls):
     95    assert cls._options is not None
     96    return cls._options.quiet
     97 
     98  @classmethod
     99  def should_remote_print(cls):
    100    assert cls._options is not None
    101    return not cls._options.no_remote_print
    102 
    103 
    104 class LogfileManager:
    105  _logfiles: dict[str, IO[str]] = {}
    106  _lock = threading.RLock()
    107 
    108  @classmethod
    109  def create_logfile(cls, build_id, outdir):
    110    with cls._lock:
    111      if logfile := cls._logfiles.get(build_id, None):
    112        return logfile
    113 
    114      outdir = pathlib.Path(outdir)
    115      latest_logfile = outdir / f'{_LOGFILE_NAME}.0'
    116 
    117      if latest_logfile.exists():
    118        with latest_logfile.open('rt') as f:
    119          first_line = f.readline()
    120          if log_build_id := BUILD_ID_RE.search(first_line):
    121            # If the newest logfile on disk is referencing the same build we are
    122            # currently processing, we probably crashed previously and we should
    123            # pick up where we left off in the same logfile.
    124            if log_build_id.group('build_id') == build_id:
    125              cls._logfiles[build_id] = latest_logfile.open('at')
    126              return cls._logfiles[build_id]
    127 
    128      # Do the logfile name shift.
    129      filenames = os.listdir(outdir)
    130      logfiles = {f for f in filenames if f.startswith(_LOGFILE_NAME)}
    131      for idx in reversed(range(_MAX_LOGFILES)):
    132        current_name = f'{_LOGFILE_NAME}.{idx}'
    133        next_name = f'{_LOGFILE_NAME}.{idx+1}'
    134        if current_name in logfiles:
    135          shutil.move(os.path.join(outdir, current_name),
    136                      os.path.join(outdir, next_name))
    137 
    138      # Create a new 0th logfile.
    139      logfile = latest_logfile.open('wt')
    140      logfile.write(FIRST_LOG_LINE.format(build_id=build_id, outdir=outdir))
    141      logfile.flush()
    142      cls._logfiles[build_id] = logfile
    143      return logfile
    144 
    145 
    146 class TaskStats:
    147  """Class to keep track of aggregate stats for all tasks across threads."""
    148  _num_processes = 0
    149  _completed_tasks = 0
    150  _total_tasks = 0
    151  _lock = threading.RLock()
    152 
    153  @classmethod
    154  def no_running_processes(cls):
    155    with cls._lock:
    156      return cls._num_processes == 0
    157 
    158  @classmethod
    159  def add_task(cls):
    160    with cls._lock:
    161      cls._total_tasks += 1
    162 
    163  @classmethod
    164  def add_process(cls):
    165    with cls._lock:
    166      cls._num_processes += 1
    167 
    168  @classmethod
    169  def remove_process(cls):
    170    with cls._lock:
    171      cls._num_processes -= 1
    172 
    173  @classmethod
    174  def complete_task(cls):
    175    with cls._lock:
    176      cls._completed_tasks += 1
    177 
    178  @classmethod
    179  def num_pending_tasks(cls):
    180    with cls._lock:
    181      return cls._total_tasks - cls._completed_tasks
    182 
    183  @classmethod
    184  def num_completed_tasks(cls):
    185    with cls._lock:
    186      return cls._completed_tasks
    187 
    188  @classmethod
    189  def total_tasks(cls):
    190    with cls._lock:
    191      return cls._total_tasks
    192 
    193  @classmethod
    194  def get_title_message(cls):
    195    with cls._lock:
    196      return f'Analysis Steps: {cls._completed_tasks}/{cls._total_tasks}'
    197 
    198  @classmethod
    199  def query_build(cls, query_build_id: str = None):
    200    builds = []
    201    if query_build_id:
    202      if build := BuildManager.get_build(query_build_id):
    203        builds.append(build)
    204    else:
    205      builds = BuildManager.get_all_builds()
    206    build_infos = []
    207    for build in builds:
    208      build_infos.append(build.query_build_info())
    209    return {
    210        'pid': os.getpid(),
    211        'builds': build_infos,
    212    }
    213 
    214  @classmethod
    215  def prefix(cls, build_id: str = None):
    216    # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ]
    217    # Time taken and task completion rate are not important for the build server
    218    # since it is always running in the background and uses idle priority for
    219    # its tasks.
    220    with cls._lock:
    221      if build_id:
    222        build = BuildManager.get_build(build_id)
    223        _num_processes = build.process_count()
    224        _completed_tasks = build.completed_task_count()
    225        _total_tasks = build.total_task_count()
    226      else:
    227        _num_processes = cls._num_processes
    228        _completed_tasks = cls._completed_tasks
    229        _total_tasks = cls._total_tasks
    230      word = 'process' if _num_processes == 1 else 'processes'
    231      return (f'{_num_processes} {word}, '
    232              f'{_completed_tasks}/{_total_tasks}')
    233 
    234 
    235 def check_pid_alive(pid: int):
    236  try:
    237    os.kill(pid, 0)
    238  except OSError:
    239    return False
    240  return True
    241 
    242 
    243 @dataclasses.dataclass
    244 class Build:
    245  id: str
    246  pid: int
    247  env: dict
    248  stdout: IO[str]
    249  cwd: Optional[str] = None
    250  _logfile: Optional[IO[str]] = None
    251  _is_ninja_alive: bool = True
    252  _tasks: List[Task] = dataclasses.field(default_factory=list)
    253  _completed_task_count = 0
    254  _active_process_count = 0
    255  _lock: threading.RLock = dataclasses.field(default_factory=threading.RLock,
    256                                             repr=False,
    257                                             init=False)
    258 
    259  def __hash__(self):
    260    return hash((self.id, self.pid, self.cwd))
    261 
    262  def add_task(self, task: Task):
    263    self._status_update(f'QUEUED {task.name}')
    264    with self._lock:
    265      self._tasks.append(task)
    266    TaskStats.add_task()
    267    TaskManager.add_task(task)
    268 
    269  def add_process(self, task: Task):
    270    self._status_update(f'STARTING {task.name}')
    271    with self._lock:
    272      self._active_process_count += 1
    273    TaskStats.add_process()
    274 
    275  def task_done(self, task: Task, status_string: str):
    276    self._status_update(f'{status_string} {task.name}')
    277    TaskStats.complete_task()
    278    TaskManager.task_done(task)
    279    with self._lock:
    280      self._completed_task_count += 1
    281 
    282    # We synchronize all terminal title info rather than having it per build
    283    # since if two builds are happening in the same terminal concurrently, both
    284    # builds will be overriding each other's titles continuously. Usually we
    285    # only have the one build anyways so it should equivalent in most cases.
    286    BuildManager.update_remote_titles()
    287    with self._lock:
    288      if not self.is_active():
    289        self._logfile.close()
    290        # Reset in case its the last build.
    291        BuildManager.update_remote_titles('')
    292 
    293  def process_complete(self):
    294    with self._lock:
    295      self._active_process_count -= 1
    296    TaskStats.remove_process()
    297 
    298  def ensure_logfile(self):
    299    with self._lock:
    300      if not self._logfile:
    301        assert self.cwd is not None
    302        self._logfile = LogfileManager.create_logfile(self.id, self.cwd)
    303 
    304  def log(self, message: str):
    305    with self._lock:
    306      self.ensure_logfile()
    307      if self._logfile.closed:
    308        # BuildManager#broadcast can call log after the build is done and the
    309        # log is closed. Might make sense to separate out that flow so we can
    310        # raise an exception here otherwise.
    311        return
    312      print(message, file=self._logfile, flush=True)
    313 
    314  def _status_update(self, status_message):
    315    prefix = f'[{TaskStats.prefix(self.id)}] '
    316    self.log(f'{prefix}{status_message}')
    317    print_status(prefix, status_message)
    318 
    319  def total_task_count(self):
    320    with self._lock:
    321      return len(self._tasks)
    322 
    323  def completed_task_count(self):
    324    with self._lock:
    325      return self._completed_task_count
    326 
    327  def pending_task_count(self):
    328    with self._lock:
    329      return self.total_task_count() - self.completed_task_count()
    330 
    331  def process_count(self):
    332    with self._lock:
    333      return self._active_process_count
    334 
    335  def is_active(self):
    336    if self.pending_task_count() > 0:
    337      return True
    338    # Ninja is not coming back to life so only check on it if last we checked it
    339    # was still alive.
    340    if self._is_ninja_alive:
    341      self._is_ninja_alive = check_pid_alive(self.pid)
    342    return self._is_ninja_alive
    343 
    344  def query_build_info(self):
    345    current_tasks = TaskManager.get_current_tasks(self.id)
    346    return {
    347        'build_id': self.id,
    348        'is_active': self.is_active(),
    349        'completed_tasks': self.completed_task_count(),
    350        'pending_tasks': self.pending_task_count(),
    351        'active_tasks': [t.cmd for t in current_tasks],
    352        'outdir': self.cwd,
    353    }
    354 
    355 
    356 class BuildManager:
    357  _builds_by_id: dict[str, Build] = dict()
    358  _cached_ttys: dict[(int, int), tuple[IO[str], bool]] = dict()
    359  _lock = threading.RLock()
    360 
    361  @classmethod
    362  def register_builder(cls, env, pid, cwd):
    363    build_id = env['AUTONINJA_BUILD_ID']
    364    stdout = cls.open_tty(env['AUTONINJA_STDOUT_NAME'])
    365    # Tells the script not to re-delegate to build server.
    366    env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1'
    367 
    368    with cls._lock:
    369      build = Build(id=build_id,
    370                    pid=pid,
    371                    cwd=cwd,
    372                    env=env,
    373                    stdout=stdout)
    374      cls.maybe_init_cwd(build, cwd)
    375      cls._builds_by_id[build_id] = build
    376    cls.update_remote_titles()
    377 
    378  @classmethod
    379  def maybe_init_cwd(cls, build: Build, cwd: str):
    380    if cwd is not None:
    381      with cls._lock:
    382        if build.cwd is None:
    383          build.cwd = cwd
    384        else:
    385          assert pathlib.Path(cwd).samefile(
    386              build.cwd), f'{repr(cwd)} != {repr(build.cwd)}'
    387        build.ensure_logfile()
    388 
    389  @classmethod
    390  def get_build(cls, build_id):
    391    with cls._lock:
    392      return cls._builds_by_id.get(build_id, None)
    393 
    394  @classmethod
    395  def open_tty(cls, tty_path):
    396    # Do not open the same tty multiple times. Use st_ino and st_dev to compare
    397    # file descriptors.
    398    tty = open(tty_path, 'at')
    399    st = os.stat(tty.fileno())
    400    tty_key = (st.st_ino, st.st_dev)
    401    with cls._lock:
    402      # Dedupes ttys
    403      if tty_key not in cls._cached_ttys:
    404        # TTYs are kept open for the lifetime of the server so that broadcast
    405        # messages (e.g. uncaught exceptions) can be sent to them even if they
    406        # are not currently building anything.
    407        cls._cached_ttys[tty_key] = (tty, tty.isatty())
    408      else:
    409        tty.close()
    410      return cls._cached_ttys[tty_key][0]
    411 
    412  @classmethod
    413  def get_active_builds(cls) -> List[Build]:
    414    builds = cls.get_all_builds()
    415    return list(build for build in builds if build.is_active())
    416 
    417  @classmethod
    418  def get_all_builds(cls) -> List[Build]:
    419    with cls._lock:
    420      return list(cls._builds_by_id.values())
    421 
    422  @classmethod
    423  def broadcast(cls, msg: str):
    424    with cls._lock:
    425      ttys = list(cls._cached_ttys.values())
    426      builds = list(cls._builds_by_id.values())
    427    if OptionsManager.should_remote_print():
    428      for tty, _unused in ttys:
    429        try:
    430          tty.write(msg + '\n')
    431          tty.flush()
    432        except BrokenPipeError:
    433          pass
    434    for build in builds:
    435      build.log(msg)
    436    # Write to the current terminal if we have not written to it yet.
    437    st = os.stat(sys.stderr.fileno())
    438    stderr_key = (st.st_ino, st.st_dev)
    439    if stderr_key not in cls._cached_ttys:
    440      print(msg, file=sys.stderr)
    441 
    442  @classmethod
    443  def update_remote_titles(cls, new_title=None):
    444    if new_title is None:
    445      if not cls.has_active_builds() and TaskStats.num_pending_tasks() == 0:
    446        # Setting an empty title causes most terminals to go back to the
    447        # default title (and at least prevents the tab title from being
    448        # "Analysis Steps: N/N" forevermore.
    449        new_title = ''
    450      else:
    451        new_title = TaskStats.get_title_message()
    452 
    453    with cls._lock:
    454      ttys = list(cls._cached_ttys.values())
    455    for tty, isatty in ttys:
    456      if isatty:
    457        try:
    458          tty.write(f'\033]2;{new_title}\007')
    459          tty.flush()
    460        except BrokenPipeError:
    461          pass
    462 
    463  @classmethod
    464  def has_active_builds(cls):
    465    return bool(cls.get_active_builds())
    466 
    467 
    468 class TaskManager:
    469  """Class to encapsulate a threadsafe queue and handle deactivating it."""
    470  _queue: collections.deque[Task] = collections.deque()
    471  _current_tasks: set[Task] = set()
    472  _deactivated = False
    473  _lock = threading.RLock()
    474 
    475  @classmethod
    476  def add_task(cls, task: Task):
    477    assert not cls._deactivated
    478    with cls._lock:
    479      cls._queue.appendleft(task)
    480    cls._maybe_start_tasks()
    481 
    482  @classmethod
    483  def task_done(cls, task: Task):
    484    with cls._lock:
    485      cls._current_tasks.discard(task)
    486 
    487  @classmethod
    488  def get_current_tasks(cls, build_id):
    489    with cls._lock:
    490      return [t for t in cls._current_tasks if t.build.id == build_id]
    491 
    492  @classmethod
    493  def deactivate(cls):
    494    cls._deactivated = True
    495    tasks_to_terminate: list[Task] = []
    496    with cls._lock:
    497      while cls._queue:
    498        task = cls._queue.pop()
    499        tasks_to_terminate.append(task)
    500      # Cancel possibly running tasks.
    501      tasks_to_terminate.extend(cls._current_tasks)
    502    # Terminate outside lock since task threads need the lock to finish
    503    # terminating.
    504    for task in tasks_to_terminate:
    505      task.terminate()
    506 
    507  @classmethod
    508  def cancel_build(cls, build_id):
    509    terminated_pending_tasks: list[Task] = []
    510    terminated_current_tasks: list[Task] = []
    511    with cls._lock:
    512      # Cancel pending tasks.
    513      for task in cls._queue:
    514        if task.build.id == build_id:
    515          terminated_pending_tasks.append(task)
    516      for task in terminated_pending_tasks:
    517        cls._queue.remove(task)
    518      # Cancel running tasks.
    519      for task in cls._current_tasks:
    520        if task.build.id == build_id:
    521          terminated_current_tasks.append(task)
    522    # Terminate tasks outside lock since task threads need the lock to finish
    523    # terminating.
    524    for task in terminated_pending_tasks:
    525      task.terminate()
    526    for task in terminated_current_tasks:
    527      task.terminate()
    528 
    529  @staticmethod
    530  # pylint: disable=inconsistent-return-statements
    531  def _num_running_processes():
    532    with open('/proc/stat') as f:
    533      for line in f:
    534        if line.startswith('procs_running'):
    535          return int(line.rstrip().split()[1])
    536    assert False, 'Could not read /proc/stat'
    537 
    538  @classmethod
    539  def _maybe_start_tasks(cls):
    540    if cls._deactivated:
    541      return
    542    # Include load avg so that a small dip in the number of currently running
    543    # processes will not cause new tasks to be started while the overall load is
    544    # heavy.
    545    cur_load = max(cls._num_running_processes(), os.getloadavg()[0])
    546    num_started = 0
    547    # Always start a task if we don't have any running, so that all tasks are
    548    # eventually finished. Try starting up tasks when the overall load is light.
    549    # Limit to at most 2 new tasks to prevent ramping up too fast. There is a
    550    # chance where multiple threads call _maybe_start_tasks and each gets to
    551    # spawn up to 2 new tasks, but since the only downside is some build tasks
    552    # get worked on earlier rather than later, it is not worth mitigating.
    553    while num_started < 2 and (TaskStats.no_running_processes()
    554                               or num_started + cur_load < os.cpu_count()):
    555      with cls._lock:
    556        try:
    557          next_task = cls._queue.pop()
    558          cls._current_tasks.add(next_task)
    559        except IndexError:
    560          return
    561      num_started += next_task.start(cls._maybe_start_tasks)
    562 
    563 
    564 # TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task
    565 #              when a Request starts to be run. This would eliminate ambiguity
    566 #              about when and whether _proc/_thread are initialized.
    567 class Task:
    568  """Class to represent one task and operations on it."""
    569 
    570  def __init__(self, name: str, build: Build, cmd: List[str], stamp_file: str):
    571    self.name = name
    572    self.build = build
    573    self.cmd = cmd
    574    self.stamp_file = stamp_file
    575    self._terminated = False
    576    self._replaced = False
    577    self._lock = threading.RLock()
    578    self._proc: Optional[subprocess.Popen] = None
    579    self._thread: Optional[threading.Thread] = None
    580    self._delete_stamp_thread: Optional[threading.Thread] = None
    581    self._return_code: Optional[int] = None
    582 
    583  @property
    584  def key(self):
    585    return (self.build.cwd, self.name)
    586 
    587  def __hash__(self):
    588    return hash((self.key, self.build.id))
    589 
    590  def __eq__(self, other):
    591    return self.key == other.key and self.build is other.build
    592 
    593  def start(self, on_complete_callback: Callable[[], None]) -> int:
    594    """Starts the task if it has not already been terminated.
    595 
    596    Returns the number of processes that have been started. This is called at
    597    most once when the task is popped off the task queue."""
    598    with self._lock:
    599      if self._terminated:
    600        return 0
    601 
    602      # Use os.nice(19) to ensure the lowest priority (idle) for these analysis
    603      # tasks since we want to avoid slowing down the actual build.
    604      # TODO(wnwen): Use ionice to reduce resource consumption.
    605      self.build.add_process(self)
    606      # This use of preexec_fn is sufficiently simple, just one os.nice call.
    607      # pylint: disable=subprocess-popen-preexec-fn
    608      self._proc = subprocess.Popen(
    609          self.cmd,
    610          stdout=subprocess.PIPE,
    611          stderr=subprocess.STDOUT,
    612          cwd=self.build.cwd,
    613          env=self.build.env,
    614          text=True,
    615          preexec_fn=lambda: os.nice(19),
    616      )
    617      self._thread = threading.Thread(
    618          target=self._complete_when_process_finishes,
    619          args=(on_complete_callback, ))
    620      self._thread.start()
    621      return 1
    622 
    623  def terminate(self, replaced=False):
    624    """Can be called multiple times to cancel and ignore the task's output."""
    625    with self._lock:
    626      if self._terminated:
    627        return
    628      self._terminated = True
    629      self._replaced = replaced
    630 
    631    # It is safe to access _proc and _thread outside of _lock since they are
    632    # only changed by self.start holding _lock when self._terminate is false.
    633    # Since we have just set self._terminate to true inside of _lock, we know
    634    # that neither _proc nor _thread will be changed from this point onwards.
    635    if self._proc:
    636      self._proc.terminate()
    637      self._proc.wait()
    638    # Ensure that self._complete is called either by the thread or by us.
    639    if self._thread:
    640      self._thread.join()
    641    else:
    642      self._complete()
    643 
    644  def _complete_when_process_finishes(self,
    645                                      on_complete_callback: Callable[[], None]):
    646    assert self._proc
    647    # We know Popen.communicate will return a str and not a byte since it is
    648    # constructed with text=True.
    649    stdout: str = self._proc.communicate()[0]
    650    self._return_code = self._proc.returncode
    651    self.build.process_complete()
    652    self._complete(stdout)
    653    on_complete_callback()
    654 
    655  def _complete(self, stdout: str = ''):
    656    """Update the user and ninja after the task has run or been terminated.
    657 
    658    This method should only be run once per task. Avoid modifying the task so
    659    that this method does not need locking."""
    660 
    661    delete_stamp = False
    662    status_string = 'FINISHED'
    663    if self._terminated:
    664      status_string = 'TERMINATED'
    665      # When tasks are replaced, avoid deleting the stamp file, context:
    666      # https://issuetracker.google.com/301961827.
    667      if not self._replaced:
    668        delete_stamp = True
    669    elif stdout or self._return_code != 0:
    670      status_string = 'FAILED'
    671      delete_stamp = True
    672      preamble = [
    673          f'FAILED: {self.name}',
    674          f'Return code: {self._return_code}',
    675          'CMD: ' + shlex.join(self.cmd),
    676          'STDOUT:',
    677      ]
    678 
    679      message = '\n'.join(preamble + [stdout])
    680      self.build.log(message)
    681      server_log(message)
    682 
    683      if OptionsManager.should_remote_print():
    684        # Add emoji to show that output is from the build server.
    685        preamble = [f'⏩ {line}' for line in preamble]
    686        remote_message = '\n'.join(preamble + [stdout])
    687        # Add a new line at start of message to clearly delineate from previous
    688        # output/text already on the remote tty we are printing to.
    689        self.build.stdout.write(f'\n{remote_message}')
    690        self.build.stdout.flush()
    691    if delete_stamp:
    692      # Force siso to consider failed targets as dirty.
    693      try:
    694        os.unlink(os.path.join(self.build.cwd, self.stamp_file))
    695      except FileNotFoundError:
    696        pass
    697    self.build.task_done(self, status_string)
    698 
    699 
    700 def _handle_add_task(data, current_tasks: Dict[Tuple[str, str], Task]):
    701  """Handle messages of type ADD_TASK."""
    702  build_id = data['build_id']
    703  build = BuildManager.get_build(build_id)
    704  BuildManager.maybe_init_cwd(build, data.get('cwd'))
    705 
    706  new_task = Task(name=data['name'],
    707                  cmd=data['cmd'],
    708                  build=build,
    709                  stamp_file=data['stamp_file'])
    710  existing_task = current_tasks.get(new_task.key)
    711  if existing_task:
    712    existing_task.terminate(replaced=True)
    713  current_tasks[new_task.key] = new_task
    714 
    715  build.add_task(new_task)
    716 
    717 
    718 def _handle_query_build(data, connection: socket.socket):
    719  """Handle messages of type QUERY_BUILD."""
    720  build_id = data['build_id']
    721  response = TaskStats.query_build(build_id)
    722  try:
    723    with connection:
    724      server_utils.SendMessage(connection, response)
    725  except BrokenPipeError:
    726    # We should not die because the client died.
    727    pass
    728 
    729 
    730 def _handle_heartbeat(connection: socket.socket):
    731  """Handle messages of type POLL_HEARTBEAT."""
    732  try:
    733    with connection:
    734      server_utils.SendMessage(connection, {
    735          'status': 'OK',
    736          'pid': os.getpid(),
    737      })
    738  except BrokenPipeError:
    739    # We should not die because the client died.
    740    pass
    741 
    742 
    743 def _handle_register_builder(data):
    744  """Handle messages of type REGISTER_BUILDER."""
    745  env = data['env']
    746  pid = int(data['builder_pid'])
    747  cwd = data['cwd']
    748 
    749  BuildManager.register_builder(env, pid, cwd)
    750 
    751 
    752 def _handle_cancel_build(data):
    753  """Handle messages of type CANCEL_BUILD."""
    754  build_id = data['build_id']
    755  TaskManager.cancel_build(build_id)
    756  BuildManager.update_remote_titles('')
    757 
    758 
    759 def _listen_for_request_data(sock: socket.socket):
    760  """Helper to encapsulate getting a new message."""
    761  while True:
    762    conn = sock.accept()[0]
    763    message = server_utils.ReceiveMessage(conn)
    764    if message:
    765      yield message, conn
    766 
    767 
    768 def _register_cleanup_signal_handlers():
    769  original_sigint_handler = signal.getsignal(signal.SIGINT)
    770  original_sigterm_handler = signal.getsignal(signal.SIGTERM)
    771 
    772  def _cleanup(signum, frame):
    773    server_log('STOPPING SERVER...')
    774    # Gracefully shut down the task manager, terminating all queued tasks.
    775    TaskManager.deactivate()
    776    server_log('STOPPED')
    777    if signum == signal.SIGINT:
    778      if callable(original_sigint_handler):
    779        original_sigint_handler(signum, frame)
    780      else:
    781        raise KeyboardInterrupt()
    782    if signum == signal.SIGTERM:
    783      # Sometimes sigterm handler is not a callable.
    784      if callable(original_sigterm_handler):
    785        original_sigterm_handler(signum, frame)
    786      else:
    787        sys.exit(1)
    788 
    789  signal.signal(signal.SIGINT, _cleanup)
    790  signal.signal(signal.SIGTERM, _cleanup)
    791 
    792 
    793 def _process_requests(sock: socket.socket, exit_on_idle: bool):
    794  """Main loop for build server receiving request messages."""
    795  # Since dicts in python can contain anything, explicitly type tasks to help
    796  # make static type checking more useful.
    797  tasks: Dict[Tuple[str, str], Task] = {}
    798  server_log(
    799      'READY... Remember to set android_static_analysis="build_server" in '
    800      'args.gn files')
    801  _register_cleanup_signal_handlers()
    802  # pylint: disable=too-many-nested-blocks
    803  while True:
    804    try:
    805      for data, connection in _listen_for_request_data(sock):
    806        message_type = data.get('message_type', server_utils.ADD_TASK)
    807        if message_type == server_utils.POLL_HEARTBEAT:
    808          _handle_heartbeat(connection)
    809        elif message_type == server_utils.ADD_TASK:
    810          connection.close()
    811          _handle_add_task(data, tasks)
    812        elif message_type == server_utils.QUERY_BUILD:
    813          _handle_query_build(data, connection)
    814        elif message_type == server_utils.REGISTER_BUILDER:
    815          connection.close()
    816          _handle_register_builder(data)
    817        elif message_type == server_utils.CANCEL_BUILD:
    818          connection.close()
    819          _handle_cancel_build(data)
    820        else:
    821          connection.close()
    822    except TimeoutError:
    823      # If we have not received a new task in a while and do not have any
    824      # pending tasks or running builds, then exit. Otherwise keep waiting.
    825      if (TaskStats.num_pending_tasks() == 0
    826          and not BuildManager.has_active_builds() and exit_on_idle):
    827        break
    828    except KeyboardInterrupt:
    829      break
    830  BuildManager.update_remote_titles('')
    831 
    832 
    833 def query_build_info(build_id=None):
    834  """Communicates with the main server to query build info."""
    835  return _send_message_with_response({
    836      'message_type': server_utils.QUERY_BUILD,
    837      'build_id': build_id,
    838  })
    839 
    840 
    841 def _wait_for_build(build_id):
    842  """Comunicates with the main server waiting for a build to complete."""
    843  start_time = datetime.datetime.now()
    844  while True:
    845    try:
    846      build_info = query_build_info(build_id)['builds'][0]
    847    except ConnectionRefusedError:
    848      print('No server running. It likely finished all tasks.')
    849      print('You can check $OUTDIR/buildserver.log.0 to be sure.')
    850      return 0
    851 
    852    pending_tasks = build_info['pending_tasks']
    853 
    854    if pending_tasks == 0:
    855      print(f'\nAll tasks completed for build_id: {build_id}.')
    856      return 0
    857 
    858    current_time = datetime.datetime.now()
    859    duration = current_time - start_time
    860    print(f'\rWaiting for {pending_tasks} tasks [{str(duration)}]\033[K',
    861          end='',
    862          flush=True)
    863    time.sleep(1)
    864 
    865 
    866 def _wait_for_idle():
    867  """Communicates with the main server waiting for all builds to complete."""
    868  start_time = datetime.datetime.now()
    869  while True:
    870    try:
    871      builds = query_build_info()['builds']
    872    except ConnectionRefusedError:
    873      print('No server running. It likely finished all tasks.')
    874      print('You can check $OUTDIR/buildserver.log.0 to be sure.')
    875      return 0
    876 
    877    all_pending_tasks = 0
    878    all_completed_tasks = 0
    879    for build_info in builds:
    880      pending_tasks = build_info['pending_tasks']
    881      completed_tasks = build_info['completed_tasks']
    882      active = build_info['is_active']
    883      # Ignore completed builds.
    884      if active or pending_tasks:
    885        all_pending_tasks += pending_tasks
    886        all_completed_tasks += completed_tasks
    887    total_tasks = all_pending_tasks + all_completed_tasks
    888 
    889    if all_pending_tasks == 0:
    890      print('\nServer Idle, All tasks complete.')
    891      return 0
    892 
    893    current_time = datetime.datetime.now()
    894    duration = current_time - start_time
    895    print(
    896        f'\rWaiting for {all_pending_tasks} remaining tasks. '
    897        f'({all_completed_tasks}/{total_tasks} tasks complete) '
    898        f'[{str(duration)}]\033[K',
    899        end='',
    900        flush=True)
    901    time.sleep(0.5)
    902 
    903 
    904 def _check_if_running():
    905  """Communicates with the main server to make sure its running."""
    906  with socket.socket(socket.AF_UNIX) as sock:
    907    try:
    908      sock.connect(server_utils.SOCKET_ADDRESS)
    909    except OSError:
    910      print('Build server is not running and '
    911            'android_static_analysis="build_server" is set.\nPlease run '
    912            'this command in a separate terminal:\n\n'
    913            '$ build/android/fast_local_dev_server.py\n')
    914      return 1
    915    else:
    916      return 0
    917 
    918 
    919 def _send_message_and_close(message_dict):
    920  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
    921    sock.connect(server_utils.SOCKET_ADDRESS)
    922    sock.settimeout(1)
    923    server_utils.SendMessage(sock, message_dict)
    924 
    925 
    926 def _send_message_with_response(message_dict):
    927  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
    928    sock.connect(server_utils.SOCKET_ADDRESS)
    929    sock.settimeout(1)
    930    server_utils.SendMessage(sock, message_dict)
    931    return server_utils.ReceiveMessage(sock)
    932 
    933 
    934 def _send_cancel_build(build_id):
    935  _send_message_and_close({
    936      'message_type': server_utils.CANCEL_BUILD,
    937      'build_id': build_id,
    938  })
    939  return 0
    940 
    941 
    942 def _register_builder(build_id, builder_pid, output_directory):
    943  if output_directory is not None:
    944    output_directory = str(pathlib.Path(output_directory).absolute())
    945  for _attempt in range(3):
    946    try:
    947      # Ensure environment variables that the server expects to be there are
    948      # present.
    949      server_utils.AssertEnvironmentVariables()
    950 
    951      _send_message_and_close({
    952          'message_type': server_utils.REGISTER_BUILDER,
    953          'env': dict(os.environ),
    954          'builder_pid': builder_pid,
    955          'cwd': output_directory,
    956      })
    957      return 0
    958    except OSError:
    959      time.sleep(0.05)
    960  print(f'Failed to register builer for build_id={build_id}.')
    961  return 1
    962 
    963 
    964 def poll_server(retries=3):
    965  """Communicates with the main server to query build info."""
    966  for _attempt in range(retries):
    967    try:
    968      response = _send_message_with_response(
    969          {'message_type': server_utils.POLL_HEARTBEAT})
    970      if response:
    971        break
    972    except OSError:
    973      time.sleep(0.05)
    974  else:
    975    return None
    976  return response['pid']
    977 
    978 
    979 def _print_build_status_all():
    980  try:
    981    query_data = query_build_info(None)
    982  except ConnectionRefusedError:
    983    print('No server running. Consult $OUTDIR/buildserver.log.0')
    984    return 0
    985  builds = query_data['builds']
    986  pid = query_data['pid']
    987  all_active_tasks = []
    988  print(f'Build server (PID={pid}) has {len(builds)} registered builds')
    989  for build_info in builds:
    990    build_id = build_info['build_id']
    991    pending_tasks = build_info['pending_tasks']
    992    completed_tasks = build_info['completed_tasks']
    993    active_tasks = build_info['active_tasks']
    994    out_dir = build_info['outdir']
    995    active = build_info['is_active']
    996    total_tasks = pending_tasks + completed_tasks
    997    all_active_tasks += active_tasks
    998    if total_tasks == 0 and not active:
    999      status = 'Finished without any jobs'
   1000    else:
   1001      if active:
   1002        status = 'Siso still running'
   1003      else:
   1004        status = 'Siso finished'
   1005      if out_dir:
   1006        status += f' in {out_dir}'
   1007      status += f'. Completed [{completed_tasks}/{total_tasks}].'
   1008      if completed_tasks < total_tasks:
   1009        status += f' {len(active_tasks)} task(s) currently executing'
   1010    print(f'{build_id}: {status}')
   1011    if all_active_tasks:
   1012      total = len(all_active_tasks)
   1013      to_show = min(4, total)
   1014      print(f'Currently executing (showing {to_show} of {total}):')
   1015      for cmd in sorted(all_active_tasks)[:to_show]:
   1016        truncated = shlex.join(cmd)
   1017        if len(truncated) > 200:
   1018          truncated = truncated[:200] + '...'
   1019        print(truncated)
   1020  return 0
   1021 
   1022 
   1023 def _print_build_status(build_id):
   1024  server_path = os.path.relpath(str(server_utils.SERVER_SCRIPT))
   1025  try:
   1026    builds = query_build_info(build_id)['builds']
   1027    if not builds:
   1028      print(f'⚠️ No build found with id ({build_id})')
   1029      print('⚠️ To see the status of all builds:',
   1030            shlex.join([server_path, '--print-status-all']))
   1031      return 1
   1032    build_info = builds[0]
   1033  except ConnectionRefusedError:
   1034    print('⚠️ No server running. Consult $OUTDIR/buildserver.log.0')
   1035    return 0
   1036  pending_tasks = build_info['pending_tasks']
   1037 
   1038  # Print nothing unless there are still pending tasks
   1039  if pending_tasks:
   1040    is_str = 'is' if pending_tasks == 1 else 'are'
   1041    job_str = 'job' if pending_tasks == 1 else 'jobs'
   1042    print(f'⏩ There {is_str} still {pending_tasks} static analysis {job_str}'
   1043          ' running in the background.')
   1044    print('⏩ To wait for them:', shlex.join([server_path, '--wait-for-idle']))
   1045  return 0
   1046 
   1047 
   1048 def _wait_for_task_requests(exit_on_idle):
   1049  with socket.socket(socket.AF_UNIX) as sock:
   1050    sock.settimeout(_SOCKET_TIMEOUT)
   1051    try:
   1052      sock.bind(server_utils.SOCKET_ADDRESS)
   1053    except OSError as e:
   1054      # errno 98 is Address already in use
   1055      if e.errno == 98:
   1056        if not OptionsManager.is_quiet():
   1057          pid = poll_server()
   1058          print(f'Another instance is already running (pid: {pid}).',
   1059                file=sys.stderr)
   1060        return 1
   1061      raise
   1062    sock.listen()
   1063    _process_requests(sock, exit_on_idle)
   1064  return 0
   1065 
   1066 
   1067 def main():
   1068  # pylint: disable=too-many-return-statements
   1069  parser = argparse.ArgumentParser(description=__doc__)
   1070  parser.add_argument(
   1071      '--fail-if-not-running',
   1072      action='store_true',
   1073      help='Used by GN to fail fast if the build server is not running.')
   1074  parser.add_argument(
   1075      '--exit-on-idle',
   1076      action='store_true',
   1077      help='Server started on demand. Exit when all tasks run out.')
   1078  parser.add_argument('--quiet',
   1079                      action='store_true',
   1080                      help='Do not output status updates.')
   1081  parser.add_argument('--no-remote-print',
   1082                      action='store_true',
   1083                      help='Do not output errors to remote terminals.')
   1084  parser.add_argument('--wait-for-build',
   1085                      metavar='BUILD_ID',
   1086                      help='Wait for build server to finish with all tasks '
   1087                      'for BUILD_ID and output any pending messages.')
   1088  parser.add_argument('--wait-for-idle',
   1089                      action='store_true',
   1090                      help='Wait for build server to finish with all '
   1091                      'pending tasks.')
   1092  parser.add_argument('--print-status',
   1093                      metavar='BUILD_ID',
   1094                      help='Print the current state of a build.')
   1095  parser.add_argument('--print-status-all',
   1096                      action='store_true',
   1097                      help='Print the current state of all active builds.')
   1098  parser.add_argument(
   1099      '--register-build-id',
   1100      metavar='BUILD_ID',
   1101      help='Inform the build server that a new build has started.')
   1102  parser.add_argument('--output-directory',
   1103                      help='Build directory (use with --register-build-id)')
   1104  parser.add_argument('--builder-pid',
   1105                      help='Builder process\'s pid for build BUILD_ID.')
   1106  parser.add_argument('--cancel-build',
   1107                      metavar='BUILD_ID',
   1108                      help='Cancel all pending and running tasks for BUILD_ID.')
   1109  args = parser.parse_args()
   1110  OptionsManager.set_options(args)
   1111 
   1112  if args.fail_if_not_running:
   1113    return _check_if_running()
   1114  if args.wait_for_build:
   1115    return _wait_for_build(args.wait_for_build)
   1116  if args.wait_for_idle:
   1117    return _wait_for_idle()
   1118  if args.print_status:
   1119    return _print_build_status(args.print_status)
   1120  if args.print_status_all:
   1121    return _print_build_status_all()
   1122  if args.register_build_id:
   1123    return _register_builder(args.register_build_id, args.builder_pid,
   1124                             args.output_directory)
   1125  if args.cancel_build:
   1126    return _send_cancel_build(args.cancel_build)
   1127  return _wait_for_task_requests(args.exit_on_idle)
   1128 
   1129 
   1130 if __name__ == '__main__':
   1131  sys.excepthook = _exception_hook
   1132  sys.exit(main())