progressbar.py (3904B)
1 # Text progress bar library, like curl or scp. 2 3 import math 4 import sys 5 from datetime import datetime, timedelta 6 7 if sys.platform.startswith("win"): 8 from .terminal_win import Terminal 9 else: 10 from .terminal_unix import Terminal 11 12 13 class NullProgressBar: 14 def update(self, current, data): 15 pass 16 17 def poke(self): 18 pass 19 20 def finish(self, complete=True): 21 pass 22 23 def beginline(self): 24 pass 25 26 def message(self, msg): 27 sys.stdout.write(msg + "\n") 28 29 @staticmethod 30 def update_granularity(): 31 return timedelta.max 32 33 34 class ProgressBar: 35 def __init__(self, limit, fmt): 36 assert self.conservative_isatty() 37 38 self.prior = None 39 self.atLineStart = True 40 # [{str:str}] Describtion of how to lay out each field in the counters map. 41 self.counters_fmt = fmt 42 # int: The value of 'current' equal to 100%. 43 self.limit = limit 44 # int: max digits in limit 45 self.limit_digits = int(math.ceil(math.log10(self.limit))) 46 # datetime: The start time. 47 self.t0 = datetime.now() 48 # datetime: Optional, the last time update() ran. 49 self.last_update_time = None 50 51 # Compute the width of the counters and build the format string. 52 self.counters_width = 1 # [ 53 for layout in self.counters_fmt: 54 self.counters_width += self.limit_digits 55 # | (or ']' for the last one) 56 self.counters_width += 1 57 58 self.barlen = 64 - self.counters_width 59 60 @staticmethod 61 def update_granularity(): 62 return timedelta(seconds=0.1) 63 64 def update(self, current, data): 65 # Record prior for poke. 66 self.prior = (current, data) 67 self.atLineStart = False 68 69 # Build counters string. 70 sys.stdout.write("\r[") 71 for layout in self.counters_fmt: 72 Terminal.set_color(layout["color"]) 73 sys.stdout.write( 74 ("{:" + str(self.limit_digits) + "d}").format(data[layout["value"]]) 75 ) 76 Terminal.reset_color() 77 if layout != self.counters_fmt[-1]: 78 sys.stdout.write("|") 79 else: 80 sys.stdout.write("] ") 81 82 # Build the bar. 83 pct = int(100.0 * current / self.limit) 84 sys.stdout.write(f"{pct:3d}% ") 85 86 barlen = int(1.0 * self.barlen * current / self.limit) - 1 87 bar = "=" * barlen + ">" + " " * (self.barlen - barlen - 1) 88 sys.stdout.write(bar + "|") 89 90 # Update the bar. 91 now = datetime.now() 92 dt = now - self.t0 93 dt = dt.seconds + dt.microseconds * 1e-6 94 sys.stdout.write(f"{dt:6.1f}s") 95 Terminal.clear_right() 96 97 # Force redisplay, since we didn't write a \n. 98 sys.stdout.flush() 99 100 self.last_update_time = now 101 102 def poke(self): 103 if not self.prior: 104 return 105 if datetime.now() - self.last_update_time < self.update_granularity(): 106 return 107 self.update(*self.prior) 108 109 def finish(self, complete=True): 110 if not self.prior: 111 sys.stdout.write( 112 "No test run... You can try adding" 113 " --run-slow-tests or --run-skipped to run more tests\n" 114 ) 115 return 116 final_count = self.limit if complete else self.prior[0] 117 self.update(final_count, self.prior[1]) 118 sys.stdout.write("\n") 119 120 def beginline(self): 121 if not self.atLineStart: 122 sys.stdout.write("\n") 123 self.atLineStart = True 124 125 def message(self, msg): 126 self.beginline() 127 sys.stdout.write(msg) 128 sys.stdout.write("\n") 129 130 @staticmethod 131 def conservative_isatty(): 132 """ 133 Prefer erring on the side of caution and not using terminal commands if 134 the current output stream may be a file. 135 """ 136 return sys.stdout.isatty()