tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

progressbar.py (3904B)


      1 # Text progress bar library, like curl or scp.
      2 
      3 import math
      4 import sys
      5 from datetime import datetime, timedelta
      6 
      7 if sys.platform.startswith("win"):
      8    from .terminal_win import Terminal
      9 else:
     10    from .terminal_unix import Terminal
     11 
     12 
     13 class NullProgressBar:
     14    def update(self, current, data):
     15        pass
     16 
     17    def poke(self):
     18        pass
     19 
     20    def finish(self, complete=True):
     21        pass
     22 
     23    def beginline(self):
     24        pass
     25 
     26    def message(self, msg):
     27        sys.stdout.write(msg + "\n")
     28 
     29    @staticmethod
     30    def update_granularity():
     31        return timedelta.max
     32 
     33 
     34 class ProgressBar:
     35    def __init__(self, limit, fmt):
     36        assert self.conservative_isatty()
     37 
     38        self.prior = None
     39        self.atLineStart = True
     40        # [{str:str}] Describtion of how to lay out each field in the counters map.
     41        self.counters_fmt = fmt
     42        # int: The value of 'current' equal to 100%.
     43        self.limit = limit
     44        # int: max digits in limit
     45        self.limit_digits = int(math.ceil(math.log10(self.limit)))
     46        # datetime: The start time.
     47        self.t0 = datetime.now()
     48        # datetime: Optional, the last time update() ran.
     49        self.last_update_time = None
     50 
     51        # Compute the width of the counters and build the format string.
     52        self.counters_width = 1  # [
     53        for layout in self.counters_fmt:
     54            self.counters_width += self.limit_digits
     55            # | (or ']' for the last one)
     56            self.counters_width += 1
     57 
     58        self.barlen = 64 - self.counters_width
     59 
     60    @staticmethod
     61    def update_granularity():
     62        return timedelta(seconds=0.1)
     63 
     64    def update(self, current, data):
     65        # Record prior for poke.
     66        self.prior = (current, data)
     67        self.atLineStart = False
     68 
     69        # Build counters string.
     70        sys.stdout.write("\r[")
     71        for layout in self.counters_fmt:
     72            Terminal.set_color(layout["color"])
     73            sys.stdout.write(
     74                ("{:" + str(self.limit_digits) + "d}").format(data[layout["value"]])
     75            )
     76            Terminal.reset_color()
     77            if layout != self.counters_fmt[-1]:
     78                sys.stdout.write("|")
     79            else:
     80                sys.stdout.write("] ")
     81 
     82        # Build the bar.
     83        pct = int(100.0 * current / self.limit)
     84        sys.stdout.write(f"{pct:3d}% ")
     85 
     86        barlen = int(1.0 * self.barlen * current / self.limit) - 1
     87        bar = "=" * barlen + ">" + " " * (self.barlen - barlen - 1)
     88        sys.stdout.write(bar + "|")
     89 
     90        # Update the bar.
     91        now = datetime.now()
     92        dt = now - self.t0
     93        dt = dt.seconds + dt.microseconds * 1e-6
     94        sys.stdout.write(f"{dt:6.1f}s")
     95        Terminal.clear_right()
     96 
     97        # Force redisplay, since we didn't write a \n.
     98        sys.stdout.flush()
     99 
    100        self.last_update_time = now
    101 
    102    def poke(self):
    103        if not self.prior:
    104            return
    105        if datetime.now() - self.last_update_time < self.update_granularity():
    106            return
    107        self.update(*self.prior)
    108 
    109    def finish(self, complete=True):
    110        if not self.prior:
    111            sys.stdout.write(
    112                "No test run... You can try adding"
    113                " --run-slow-tests or --run-skipped to run more tests\n"
    114            )
    115            return
    116        final_count = self.limit if complete else self.prior[0]
    117        self.update(final_count, self.prior[1])
    118        sys.stdout.write("\n")
    119 
    120    def beginline(self):
    121        if not self.atLineStart:
    122            sys.stdout.write("\n")
    123            self.atLineStart = True
    124 
    125    def message(self, msg):
    126        self.beginline()
    127        sys.stdout.write(msg)
    128        sys.stdout.write("\n")
    129 
    130    @staticmethod
    131    def conservative_isatty():
    132        """
    133        Prefer erring on the side of caution and not using terminal commands if
    134        the current output stream may be a file.
    135        """
    136        return sys.stdout.isatty()