docker.py (6731B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 import os 6 import re 7 import sys 8 from collections.abc import Mapping 9 from urllib.parse import quote, urlencode, urlunparse 10 11 import requests 12 import requests_unixsocket 13 from mozbuild.util import memoize 14 from taskgraph.util import json 15 from taskgraph.util.yaml import load_yaml 16 17 from .. import GECKO 18 19 IMAGE_DIR = os.path.join(GECKO, "taskcluster", "docker") 20 21 22 def docker_url(path, **kwargs): 23 docker_socket = os.environ.get("DOCKER_SOCKET", "/var/run/docker.sock") 24 return urlunparse(( 25 "http+unix", 26 quote(docker_socket, safe=""), 27 path, 28 "", 29 urlencode(kwargs), 30 "", 31 )) 32 33 34 def post_to_docker(tar, api_path, **kwargs): 35 """POSTs a tar file to a given docker API path. 36 37 The tar argument can be anything that can be passed to requests.post() 38 as data (e.g. iterator or file object). 39 The extra keyword arguments are passed as arguments to the docker API. 40 """ 41 # requests-unixsocket doesn't honor requests timeouts 42 # See https://github.com/msabramo/requests-unixsocket/issues/44 43 # We have some large docker images that trigger the default timeout, 44 # so we increase the requests-unixsocket timeout here. 45 session = requests.Session() 46 session.mount( 47 requests_unixsocket.DEFAULT_SCHEME, 48 requests_unixsocket.UnixAdapter(timeout=120), 49 ) 50 req = session.post( 51 docker_url(api_path, **kwargs), 52 data=tar, 53 stream=True, 54 headers={"Content-Type": "application/x-tar"}, 55 ) 56 if req.status_code != 200: 57 message = req.json().get("message") 58 if not message: 59 message = f"docker API returned HTTP code {req.status_code}" 60 raise Exception(message) 61 status_line = {} 62 63 buf = b"" 64 for content in req.iter_content(chunk_size=None): 65 if not content: 66 continue 67 # Sometimes, a chunk of content is not a complete json, so we cumulate 68 # with leftovers from previous iterations. 69 buf += content 70 try: 71 data = json.loads(buf) 72 except Exception: 73 continue 74 buf = b"" 75 # data is sometimes an empty dict. 76 if not data: 77 continue 78 # Mimick how docker itself presents the output. This code was tested 79 # with API version 1.18 and 1.26. 80 if "status" in data: 81 if "id" in data: 82 if sys.stderr.isatty(): 83 total_lines = len(status_line) 84 line = status_line.setdefault(data["id"], total_lines) 85 n = total_lines - line 86 if n > 0: 87 # Move the cursor up n lines. 88 sys.stderr.write(f"\033[{n}A") 89 # Clear line and move the cursor to the beginning of it. 90 sys.stderr.write("\033[2K\r") 91 sys.stderr.write( 92 "{}: {} {}\n".format( 93 data["id"], data["status"], data.get("progress", "") 94 ) 95 ) 96 if n > 1: 97 # Move the cursor down n - 1 lines, which, considering 98 # the carriage return on the last write, gets us back 99 # where we started. 100 sys.stderr.write(f"\033[{n - 1}B") 101 else: 102 status = status_line.get(data["id"]) 103 # Only print status changes. 104 if status != data["status"]: 105 sys.stderr.write("{}: {}\n".format(data["id"], data["status"])) 106 status_line[data["id"]] = data["status"] 107 else: 108 status_line = {} 109 sys.stderr.write("{}\n".format(data["status"])) 110 elif "stream" in data: 111 sys.stderr.write(data["stream"]) 112 elif "aux" in data: 113 sys.stderr.write(repr(data["aux"])) 114 elif "error" in data: 115 sys.stderr.write("{}\n".format(data["error"])) 116 # Sadly, docker doesn't give more than a plain string for errors, 117 # so the best we can do to propagate the error code from the command 118 # that failed is to parse the error message... 119 errcode = 1 120 m = re.search(r"returned a non-zero code: (\d+)", data["error"]) 121 if m: 122 errcode = int(m.group(1)) 123 sys.exit(errcode) 124 else: 125 raise NotImplementedError(repr(data)) 126 sys.stderr.flush() 127 128 129 class ImagePathsMap(Mapping): 130 """ImagePathsMap contains the mapping of Docker image names to their 131 context location in the filesystem. The register function allows Thunderbird 132 to define additional images under comm/taskcluster. 133 """ 134 135 def __init__(self, config_path, image_dir=IMAGE_DIR): 136 config = load_yaml(GECKO, config_path) 137 self.__update_image_paths(config["tasks"], image_dir) 138 139 def __getitem__(self, key): 140 return self.__dict__[key] 141 142 def __iter__(self): 143 return iter(self.__dict__) 144 145 def __len__(self): 146 return len(self.__dict__) 147 148 def __update_image_paths(self, jobs, image_dir): 149 self.__dict__.update({ 150 k: os.path.join(image_dir, v.get("definition", k)) for k, v in jobs.items() 151 }) 152 153 def register(self, jobs_config_path, image_dir): 154 """Register additional image_paths. In this case, there is no 'jobs' 155 key in the loaded YAML as this file is loaded via jobs-from in kind.yml.""" 156 jobs = load_yaml(GECKO, jobs_config_path) 157 self.__update_image_paths(jobs, image_dir) 158 159 160 image_paths = ImagePathsMap("taskcluster/kinds/docker-image/kind.yml") 161 162 163 def image_path(name): 164 if name in image_paths: 165 return image_paths[name] 166 return os.path.join(IMAGE_DIR, name) 167 168 169 @memoize 170 def parse_volumes(image): 171 """Parse VOLUME entries from a Dockerfile for an image.""" 172 volumes = set() 173 174 path = image_path(image) 175 176 with open(os.path.join(path, "Dockerfile"), "rb") as fh: 177 for line in fh: 178 line = line.strip() 179 # We assume VOLUME definitions don't use ARGS. 180 if not line.startswith(b"VOLUME "): 181 continue 182 183 v = line.split(None, 1)[1] 184 if v.startswith(b"["): 185 raise ValueError( 186 "cannot parse array syntax for VOLUME; convert to multiple entries" 187 ) 188 189 volumes |= {v.decode("utf-8") for v in v.split()} 190 191 return volumes