zipp.py (7039B)
1 # coding: utf-8 2 3 from __future__ import division 4 5 import io 6 import sys 7 import posixpath 8 import zipfile 9 import functools 10 import itertools 11 from collections import OrderedDict 12 13 try: 14 from contextlib import suppress 15 except ImportError: 16 from contextlib2 import suppress 17 18 __metaclass__ = type 19 20 21 def _parents(path): 22 """ 23 Given a path with elements separated by 24 posixpath.sep, generate all parents of that path. 25 26 >>> list(_parents('b/d')) 27 ['b'] 28 >>> list(_parents('/b/d/')) 29 ['/b'] 30 >>> list(_parents('b/d/f/')) 31 ['b/d', 'b'] 32 >>> list(_parents('b')) 33 [] 34 >>> list(_parents('')) 35 [] 36 """ 37 return itertools.islice(_ancestry(path), 1, None) 38 39 40 def _ancestry(path): 41 """ 42 Given a path with elements separated by 43 posixpath.sep, generate all elements of that path 44 45 >>> list(_ancestry('b/d')) 46 ['b/d', 'b'] 47 >>> list(_ancestry('/b/d/')) 48 ['/b/d', '/b'] 49 >>> list(_ancestry('b/d/f/')) 50 ['b/d/f', 'b/d', 'b'] 51 >>> list(_ancestry('b')) 52 ['b'] 53 >>> list(_ancestry('')) 54 [] 55 """ 56 path = path.rstrip(posixpath.sep) 57 while path and path != posixpath.sep: 58 yield path 59 path, tail = posixpath.split(path) 60 61 62 class CompleteDirs(zipfile.ZipFile): 63 """ 64 A ZipFile subclass that ensures that implied directories 65 are always included in the namelist. 66 """ 67 68 @staticmethod 69 def _implied_dirs(names): 70 parents = itertools.chain.from_iterable(map(_parents, names)) 71 # Cast names to a set for O(1) lookups 72 existing = set(names) 73 # Deduplicate entries in original order 74 implied_dirs = OrderedDict.fromkeys( 75 p + posixpath.sep for p in parents 76 if p + posixpath.sep not in existing 77 ) 78 return implied_dirs 79 80 def namelist(self): 81 names = super(CompleteDirs, self).namelist() 82 return names + list(self._implied_dirs(names)) 83 84 def _name_set(self): 85 return set(self.namelist()) 86 87 def resolve_dir(self, name): 88 """ 89 If the name represents a directory, return that name 90 as a directory (with the trailing slash). 91 """ 92 names = self._name_set() 93 dirname = name + '/' 94 dir_match = name not in names and dirname in names 95 return dirname if dir_match else name 96 97 @classmethod 98 def make(cls, source): 99 """ 100 Given a source (filename or zipfile), return an 101 appropriate CompleteDirs subclass. 102 """ 103 if isinstance(source, CompleteDirs): 104 return source 105 106 if not isinstance(source, zipfile.ZipFile): 107 return cls(_pathlib_compat(source)) 108 109 # Only allow for FastPath when supplied zipfile is read-only 110 if 'r' not in source.mode: 111 cls = CompleteDirs 112 113 res = cls.__new__(cls) 114 vars(res).update(vars(source)) 115 return res 116 117 118 class FastLookup(CompleteDirs): 119 """ 120 ZipFile subclass to ensure implicit 121 dirs exist and are resolved rapidly. 122 """ 123 def namelist(self): 124 with suppress(AttributeError): 125 return self.__names 126 self.__names = super(FastLookup, self).namelist() 127 return self.__names 128 129 def _name_set(self): 130 with suppress(AttributeError): 131 return self.__lookup 132 self.__lookup = super(FastLookup, self)._name_set() 133 return self.__lookup 134 135 136 def _pathlib_compat(path): 137 """ 138 For path-like objects, convert to a filename for compatibility 139 on Python 3.6.1 and earlier. 140 """ 141 try: 142 return path.__fspath__() 143 except AttributeError: 144 return str(path) 145 146 147 class Path: 148 """ 149 A pathlib-compatible interface for zip files. 150 151 Consider a zip file with this structure:: 152 153 . 154 ├── a.txt 155 └── b 156 ├── c.txt 157 └── d 158 └── e.txt 159 160 >>> data = io.BytesIO() 161 >>> zf = zipfile.ZipFile(data, 'w') 162 >>> zf.writestr('a.txt', 'content of a') 163 >>> zf.writestr('b/c.txt', 'content of c') 164 >>> zf.writestr('b/d/e.txt', 'content of e') 165 >>> zf.filename = 'abcde.zip' 166 167 Path accepts the zipfile object itself or a filename 168 169 >>> root = Path(zf) 170 171 From there, several path operations are available. 172 173 Directory iteration (including the zip file itself): 174 175 >>> a, b = root.iterdir() 176 >>> a 177 Path('abcde.zip', 'a.txt') 178 >>> b 179 Path('abcde.zip', 'b/') 180 181 name property: 182 183 >>> b.name 184 'b' 185 186 join with divide operator: 187 188 >>> c = b / 'c.txt' 189 >>> c 190 Path('abcde.zip', 'b/c.txt') 191 >>> c.name 192 'c.txt' 193 194 Read text: 195 196 >>> c.read_text() 197 'content of c' 198 199 existence: 200 201 >>> c.exists() 202 True 203 >>> (b / 'missing.txt').exists() 204 False 205 206 Coercion to string: 207 208 >>> str(c) 209 'abcde.zip/b/c.txt' 210 """ 211 212 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" 213 214 def __init__(self, root, at=""): 215 self.root = FastLookup.make(root) 216 self.at = at 217 218 def open(self, mode='r', *args, **kwargs): 219 """ 220 Open this entry as text or binary following the semantics 221 of ``pathlib.Path.open()`` by passing arguments through 222 to io.TextIOWrapper(). 223 """ 224 pwd = kwargs.pop('pwd', None) 225 zip_mode = mode[0] 226 stream = self.root.open(self.at, zip_mode, pwd=pwd) 227 if 'b' in mode: 228 if args or kwargs: 229 raise ValueError("encoding args invalid for binary operation") 230 return stream 231 return io.TextIOWrapper(stream, *args, **kwargs) 232 233 @property 234 def name(self): 235 return posixpath.basename(self.at.rstrip("/")) 236 237 def read_text(self, *args, **kwargs): 238 with self.open('r', *args, **kwargs) as strm: 239 return strm.read() 240 241 def read_bytes(self): 242 with self.open('rb') as strm: 243 return strm.read() 244 245 def _is_child(self, path): 246 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") 247 248 def _next(self, at): 249 return Path(self.root, at) 250 251 def is_dir(self): 252 return not self.at or self.at.endswith("/") 253 254 def is_file(self): 255 return not self.is_dir() 256 257 def exists(self): 258 return self.at in self.root._name_set() 259 260 def iterdir(self): 261 if not self.is_dir(): 262 raise ValueError("Can't listdir a file") 263 subs = map(self._next, self.root.namelist()) 264 return filter(self._is_child, subs) 265 266 def __str__(self): 267 return posixpath.join(self.root.filename, self.at) 268 269 def __repr__(self): 270 return self.__repr.format(self=self) 271 272 def joinpath(self, add): 273 next = posixpath.join(self.at, _pathlib_compat(add)) 274 return self._next(self.root.resolve_dir(next)) 275 276 __truediv__ = joinpath 277 278 @property 279 def parent(self): 280 parent_at = posixpath.dirname(self.at.rstrip('/')) 281 if parent_at: 282 parent_at += '/' 283 return self._next(parent_at) 284 285 if sys.version_info < (3,): 286 __div__ = __truediv__