tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

adb_logcat_monitor.py (5035B)


      1 #!/usr/bin/env python3
      2 #
      3 # Copyright 2012 The Chromium Authors
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Saves logcats from all connected devices.
      8 
      9 Usage: adb_logcat_monitor.py <base_dir> [<adb_binary_path>]
     10 
     11 This script will repeatedly poll adb for new devices and save logcats
     12 inside the <base_dir> directory, which it attempts to create.  The
     13 script will run until killed by an external signal.  To test, run the
     14 script in a shell and <Ctrl>-C it after a while.  It should be
     15 resilient across phone disconnects and reconnects and start the logcat
     16 early enough to not miss anything.
     17 """
     18 
     19 
     20 import logging
     21 import os
     22 import re
     23 import shutil
     24 import signal
     25 import subprocess
     26 import sys
     27 import time
     28 
     29 # Map from device_id -> (process, logcat_num)
     30 devices = {}
     31 
     32 
     33 class TimeoutException(Exception):
     34  """Exception used to signal a timeout."""
     35 
     36 
     37 class SigtermError(Exception):
     38  """Exception used to catch a sigterm."""
     39 
     40 
     41 def StartLogcatIfNecessary(device_id, adb_cmd, base_dir):
     42  """Spawns a adb logcat process if one is not currently running."""
     43  process, logcat_num = devices[device_id]
     44  if process:
     45    if process.poll() is None:
     46      # Logcat process is still happily running
     47      return
     48    logging.info('Logcat for device %s has died', device_id)
     49    error_filter = re.compile('- waiting for device -')
     50    for line in process.stderr:
     51      line_str = line.decode('utf8', 'replace')
     52      if not error_filter.match(line_str):
     53        logging.error(device_id + ':   ' + line_str)
     54 
     55  logging.info('Starting logcat %d for device %s', logcat_num,
     56               device_id)
     57  logcat_filename = 'logcat_%s_%03d' % (device_id, logcat_num)
     58  logcat_file = open(os.path.join(base_dir, logcat_filename), 'w')
     59  process = subprocess.Popen([adb_cmd, '-s', device_id,
     60                              'logcat', '-v', 'threadtime'],
     61                             stdout=logcat_file,
     62                             stderr=subprocess.PIPE)
     63  devices[device_id] = (process, logcat_num + 1)
     64 
     65 
     66 def GetAttachedDevices(adb_cmd):
     67  """Gets the device list from adb.
     68 
     69  We use an alarm in this function to avoid deadlocking from an external
     70  dependency.
     71 
     72  Args:
     73    adb_cmd: binary to run adb
     74 
     75  Returns:
     76    list of devices or an empty list on timeout
     77  """
     78  signal.alarm(2)
     79  try:
     80    out, err = subprocess.Popen([adb_cmd, 'devices'],
     81                                stdout=subprocess.PIPE,
     82                                stderr=subprocess.PIPE).communicate()
     83    if err:
     84      logging.warning('adb device error %s', err.strip())
     85    return re.findall('^(\\S+)\tdevice$', out.decode('latin1'), re.MULTILINE)
     86  except TimeoutException:
     87    logging.warning('"adb devices" command timed out')
     88    return []
     89  except (IOError, OSError):
     90    logging.exception('Exception from "adb devices"')
     91    return []
     92  finally:
     93    signal.alarm(0)
     94 
     95 
     96 def main(base_dir, adb_cmd='adb'):
     97  """Monitor adb forever.  Expects a SIGINT (Ctrl-C) to kill."""
     98  # We create the directory to ensure 'run once' semantics
     99  if os.path.exists(base_dir):
    100    print('adb_logcat_monitor: %s already exists? Cleaning' % base_dir)
    101    shutil.rmtree(base_dir, ignore_errors=True)
    102 
    103  os.makedirs(base_dir)
    104  logging.basicConfig(filename=os.path.join(base_dir, 'eventlog'),
    105                      level=logging.INFO,
    106                      format='%(asctime)-2s %(levelname)-8s %(message)s')
    107 
    108  # Set up the alarm for calling 'adb devices'. This is to ensure
    109  # our script doesn't get stuck waiting for a process response
    110  def TimeoutHandler(_signum, _unused_frame):
    111    raise TimeoutException()
    112  signal.signal(signal.SIGALRM, TimeoutHandler)
    113 
    114  # Handle SIGTERMs to ensure clean shutdown
    115  def SigtermHandler(_signum, _unused_frame):
    116    raise SigtermError()
    117  signal.signal(signal.SIGTERM, SigtermHandler)
    118 
    119  logging.info('Started with pid %d', os.getpid())
    120  pid_file_path = os.path.join(base_dir, 'LOGCAT_MONITOR_PID')
    121 
    122  try:
    123    with open(pid_file_path, 'w') as f:
    124      f.write(str(os.getpid()))
    125    while True:
    126      for device_id in GetAttachedDevices(adb_cmd):
    127        if not device_id in devices:
    128          subprocess.call([adb_cmd, '-s', device_id, 'logcat', '-c'])
    129          devices[device_id] = (None, 0)
    130 
    131      for device in devices:
    132        # This will spawn logcat watchers for any device ever detected
    133        StartLogcatIfNecessary(device, adb_cmd, base_dir)
    134 
    135      time.sleep(5)
    136  except SigtermError:
    137    logging.info('Received SIGTERM, shutting down')
    138  except: # pylint: disable=bare-except
    139    logging.exception('Unexpected exception in main.')
    140  finally:
    141    for process, _ in devices.values():
    142      if process:
    143        try:
    144          process.terminate()
    145        except OSError:
    146          pass
    147    os.remove(pid_file_path)
    148 
    149 
    150 if __name__ == '__main__':
    151  if 2 <= len(sys.argv) <= 3:
    152    print('adb_logcat_monitor: Initializing')
    153    if len(sys.argv) == 2:
    154      sys.exit(main(sys.argv[1]))
    155    sys.exit(main(sys.argv[1], sys.argv[2]))
    156 
    157  print('Usage: %s <base_dir> [<adb_binary_path>]' % sys.argv[0])