progress.py (6399B)
1 """ 2 clint.textui.progress 3 ~~~~~~~~~~~~~~~~~ 4 This module provides the progressbar functionality. 5 6 7 ISC License 8 9 Copyright (c) 2011, Kenneth Reitz <me@kennethreitz.com> 10 11 Permission to use, copy, modify, and/or distribute this software for any 12 purpose with or without fee is hereby granted, provided that the above 13 copyright notice and this permission notice appear in all copies. 14 15 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 16 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 17 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 18 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 19 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 20 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 21 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 22 """ 23 24 import sys 25 import time 26 27 STREAM = sys.stderr 28 29 BAR_TEMPLATE = "%s[%s%s] %i/%i - %s\r" 30 MILL_TEMPLATE = "%s %s %i/%i\r" 31 32 DOTS_CHAR = "." 33 BAR_FILLED_CHAR = "#" 34 BAR_EMPTY_CHAR = " " 35 MILL_CHARS = ["|", "/", "-", "\\"] 36 37 # How long to wait before recalculating the ETA 38 ETA_INTERVAL = 1 39 # How many intervals (excluding the current one) to calculate the simple moving 40 # average 41 ETA_SMA_WINDOW = 9 42 43 44 class Bar: 45 def __enter__(self): 46 return self 47 48 def __exit__(self, exc_type, exc_val, exc_tb): 49 self.done() 50 return False # we're not suppressing exceptions 51 52 def __init__( 53 self, 54 label="", 55 width=32, 56 hide=None, 57 empty_char=BAR_EMPTY_CHAR, 58 filled_char=BAR_FILLED_CHAR, 59 expected_size=None, 60 every=1, 61 ): 62 self.label = label 63 self.width = width 64 self.hide = hide 65 # Only show bar in terminals by default (better for piping, logging etc.) 66 if hide is None: 67 try: 68 self.hide = not STREAM.isatty() 69 except AttributeError: # output does not support isatty() 70 self.hide = True 71 self.empty_char = empty_char 72 self.filled_char = filled_char 73 self.expected_size = expected_size 74 self.every = every 75 self.start = time.time() 76 self.ittimes = [] 77 self.eta = 0 78 self.etadelta = time.time() 79 self.etadisp = self.format_time(self.eta) 80 self.last_progress = 0 81 if self.expected_size: 82 self.show(0) 83 84 def show(self, progress, count=None): 85 if count is not None: 86 self.expected_size = count 87 if self.expected_size is None: 88 raise Exception("expected_size not initialized") 89 self.last_progress = progress 90 if (time.time() - self.etadelta) > ETA_INTERVAL: 91 self.etadelta = time.time() 92 # pylint --py3k W1619 93 self.ittimes = self.ittimes[-ETA_SMA_WINDOW:] + [ 94 -(self.start - time.time()) / (progress + 1) 95 ] 96 self.eta = ( 97 sum(self.ittimes) 98 / float(len(self.ittimes)) 99 * (self.expected_size - progress) 100 ) 101 self.etadisp = self.format_time(self.eta) 102 # pylint --py3k W1619 103 x = int(self.width * progress / self.expected_size) 104 if not self.hide: 105 if (progress % self.every) == 0 or ( # True every "every" updates 106 progress == self.expected_size 107 ): # And when we're done 108 STREAM.write( 109 BAR_TEMPLATE 110 % ( 111 self.label, 112 self.filled_char * x, 113 self.empty_char * (self.width - x), 114 progress, 115 self.expected_size, 116 self.etadisp, 117 ) 118 ) 119 STREAM.flush() 120 121 def done(self): 122 self.elapsed = time.time() - self.start 123 elapsed_disp = self.format_time(self.elapsed) 124 if not self.hide: 125 # Print completed bar with elapsed time 126 STREAM.write( 127 BAR_TEMPLATE 128 % ( 129 self.label, 130 self.filled_char * self.width, 131 self.empty_char * 0, 132 self.last_progress, 133 self.expected_size, 134 elapsed_disp, 135 ) 136 ) 137 STREAM.write("\n") 138 STREAM.flush() 139 140 def format_time(self, seconds): 141 return time.strftime("%H:%M:%S", time.gmtime(seconds)) 142 143 144 def bar( 145 it, 146 label="", 147 width=32, 148 hide=None, 149 empty_char=BAR_EMPTY_CHAR, 150 filled_char=BAR_FILLED_CHAR, 151 expected_size=None, 152 every=1, 153 ): 154 """Progress iterator. Wrap your iterables with it.""" 155 156 count = len(it) if expected_size is None else expected_size 157 158 with Bar( 159 label=label, 160 width=width, 161 hide=hide, 162 empty_char=empty_char, 163 filled_char=filled_char, 164 expected_size=count, 165 every=every, 166 ) as bar: 167 for i, item in enumerate(it): 168 yield item 169 bar.show(i + 1) 170 171 172 def dots(it, label="", hide=None, every=1): 173 """Progress iterator. Prints a dot for each item being iterated""" 174 175 count = 0 176 177 if not hide: 178 STREAM.write(label) 179 180 for i, item in enumerate(it): 181 if not hide: 182 if i % every == 0: # True every "every" updates 183 STREAM.write(DOTS_CHAR) 184 sys.stderr.flush() 185 186 count += 1 187 188 yield item 189 190 STREAM.write("\n") 191 STREAM.flush() 192 193 194 def mill(it, label="", hide=None, expected_size=None, every=1): 195 """Progress iterator. Prints a mill while iterating over the items.""" 196 197 def _mill_char(_i): 198 if _i >= count: 199 return " " 200 else: 201 return MILL_CHARS[(_i // every) % len(MILL_CHARS)] 202 203 def _show(_i): 204 if not hide: 205 if (_i % every) == 0 or ( # True every "every" updates 206 _i == count 207 ): # And when we're done 208 STREAM.write(MILL_TEMPLATE % (label, _mill_char(_i), _i, count)) 209 STREAM.flush() 210 211 count = len(it) if expected_size is None else expected_size 212 213 if count: 214 _show(0) 215 216 for i, item in enumerate(it): 217 yield item 218 _show(i + 1) 219 220 if not hide: 221 STREAM.write("\n") 222 STREAM.flush()