test_zipp.py (7322B)
1 # coding: utf-8 2 3 from __future__ import division, unicode_literals 4 5 import io 6 import zipfile 7 import contextlib 8 import tempfile 9 import shutil 10 import string 11 12 try: 13 import pathlib 14 except ImportError: 15 import pathlib2 as pathlib 16 17 if not hasattr(contextlib, 'ExitStack'): 18 import contextlib2 19 contextlib.ExitStack = contextlib2.ExitStack 20 21 try: 22 import unittest 23 24 unittest.TestCase.subTest 25 except AttributeError: 26 import unittest2 as unittest 27 28 import jaraco.itertools 29 import func_timeout 30 31 import zipp 32 33 __metaclass__ = type 34 consume = tuple 35 36 37 def add_dirs(zf): 38 """ 39 Given a writable zip file zf, inject directory entries for 40 any directories implied by the presence of children. 41 """ 42 for name in zipp.CompleteDirs._implied_dirs(zf.namelist()): 43 zf.writestr(name, b"") 44 return zf 45 46 47 def build_alpharep_fixture(): 48 """ 49 Create a zip file with this structure: 50 51 . 52 ├── a.txt 53 ├── b 54 │ ├── c.txt 55 │ ├── d 56 │ │ └── e.txt 57 │ └── f.txt 58 └── g 59 └── h 60 └── i.txt 61 62 This fixture has the following key characteristics: 63 64 - a file at the root (a) 65 - a file two levels deep (b/d/e) 66 - multiple files in a directory (b/c, b/f) 67 - a directory containing only a directory (g/h) 68 69 "alpha" because it uses alphabet 70 "rep" because it's a representative example 71 """ 72 data = io.BytesIO() 73 zf = zipfile.ZipFile(data, "w") 74 zf.writestr("a.txt", b"content of a") 75 zf.writestr("b/c.txt", b"content of c") 76 zf.writestr("b/d/e.txt", b"content of e") 77 zf.writestr("b/f.txt", b"content of f") 78 zf.writestr("g/h/i.txt", b"content of i") 79 zf.filename = "alpharep.zip" 80 return zf 81 82 83 @contextlib.contextmanager 84 def temp_dir(): 85 tmpdir = tempfile.mkdtemp() 86 try: 87 yield pathlib.Path(tmpdir) 88 finally: 89 shutil.rmtree(tmpdir) 90 91 92 class TestPath(unittest.TestCase): 93 def setUp(self): 94 self.fixtures = contextlib.ExitStack() 95 self.addCleanup(self.fixtures.close) 96 97 def zipfile_alpharep(self): 98 with self.subTest(): 99 yield build_alpharep_fixture() 100 with self.subTest(): 101 yield add_dirs(build_alpharep_fixture()) 102 103 def zipfile_ondisk(self): 104 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 105 for alpharep in self.zipfile_alpharep(): 106 buffer = alpharep.fp 107 alpharep.close() 108 path = tmpdir / alpharep.filename 109 with path.open("wb") as strm: 110 strm.write(buffer.getvalue()) 111 yield path 112 113 def test_iterdir_and_types(self): 114 for alpharep in self.zipfile_alpharep(): 115 root = zipp.Path(alpharep) 116 assert root.is_dir() 117 a, b, g = root.iterdir() 118 assert a.is_file() 119 assert b.is_dir() 120 assert g.is_dir() 121 c, f, d = b.iterdir() 122 assert c.is_file() and f.is_file() 123 e, = d.iterdir() 124 assert e.is_file() 125 h, = g.iterdir() 126 i, = h.iterdir() 127 assert i.is_file() 128 129 def test_open(self): 130 for alpharep in self.zipfile_alpharep(): 131 root = zipp.Path(alpharep) 132 a, b, g = root.iterdir() 133 with a.open() as strm: 134 data = strm.read() 135 assert data == "content of a" 136 137 def test_read(self): 138 for alpharep in self.zipfile_alpharep(): 139 root = zipp.Path(alpharep) 140 a, b, g = root.iterdir() 141 assert a.read_text() == "content of a" 142 assert a.read_bytes() == b"content of a" 143 144 def test_joinpath(self): 145 for alpharep in self.zipfile_alpharep(): 146 root = zipp.Path(alpharep) 147 a = root.joinpath("a") 148 assert a.is_file() 149 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 150 assert e.read_text() == "content of e" 151 152 def test_traverse_truediv(self): 153 for alpharep in self.zipfile_alpharep(): 154 root = zipp.Path(alpharep) 155 a = root / "a" 156 assert a.is_file() 157 e = root / "b" / "d" / "e.txt" 158 assert e.read_text() == "content of e" 159 160 def test_traverse_simplediv(self): 161 """ 162 Disable the __future__.division when testing traversal. 163 """ 164 for alpharep in self.zipfile_alpharep(): 165 code = compile( 166 source="zipp.Path(alpharep) / 'a'", 167 filename="(test)", 168 mode="eval", 169 dont_inherit=True, 170 ) 171 eval(code) 172 173 def test_pathlike_construction(self): 174 """ 175 zipp.Path should be constructable from a path-like object 176 """ 177 for zipfile_ondisk in self.zipfile_ondisk(): 178 pathlike = pathlib.Path(str(zipfile_ondisk)) 179 zipp.Path(pathlike) 180 181 def test_traverse_pathlike(self): 182 for alpharep in self.zipfile_alpharep(): 183 root = zipp.Path(alpharep) 184 root / pathlib.Path("a") 185 186 def test_parent(self): 187 for alpharep in self.zipfile_alpharep(): 188 root = zipp.Path(alpharep) 189 assert (root / 'a').parent.at == '' 190 assert (root / 'a' / 'b').parent.at == 'a/' 191 192 def test_dir_parent(self): 193 for alpharep in self.zipfile_alpharep(): 194 root = zipp.Path(alpharep) 195 assert (root / 'b').parent.at == '' 196 assert (root / 'b/').parent.at == '' 197 198 def test_missing_dir_parent(self): 199 for alpharep in self.zipfile_alpharep(): 200 root = zipp.Path(alpharep) 201 assert (root / 'missing dir/').parent.at == '' 202 203 def test_mutability(self): 204 """ 205 If the underlying zipfile is changed, the Path object should 206 reflect that change. 207 """ 208 for alpharep in self.zipfile_alpharep(): 209 root = zipp.Path(alpharep) 210 a, b, g = root.iterdir() 211 alpharep.writestr('foo.txt', b'foo') 212 alpharep.writestr('bar/baz.txt', b'baz') 213 assert any( 214 child.name == 'foo.txt' 215 for child in root.iterdir()) 216 assert (root / 'foo.txt').read_text() == 'foo' 217 baz, = (root / 'bar').iterdir() 218 assert baz.read_text() == 'baz' 219 220 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 221 222 def huge_zipfile(self): 223 """Create a read-only zipfile with a huge number of entries entries.""" 224 strm = io.BytesIO() 225 zf = zipfile.ZipFile(strm, "w") 226 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 227 zf.writestr(entry, entry) 228 zf.mode = 'r' 229 return zf 230 231 def test_joinpath_constant_time(self): 232 """ 233 Ensure joinpath on items in zipfile is linear time. 234 """ 235 root = zipp.Path(self.huge_zipfile()) 236 entries = jaraco.itertools.Counter(root.iterdir()) 237 for entry in entries: 238 entry.joinpath('suffix') 239 # Check the file iterated all items 240 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 241 242 @func_timeout.func_set_timeout(3) 243 def test_implied_dirs_performance(self): 244 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 245 zipp.CompleteDirs._implied_dirs(data)