upload_generated_sources.py (5409B)
1 #!/usr/bin/env/python 2 # This Source Code Form is subject to the terms of the Mozilla Public 3 # License, v. 2.0. If a copy of the MPL was not distributed with this 4 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 6 import argparse 7 import gzip 8 import io 9 import logging 10 import os 11 import sys 12 import tarfile 13 import time 14 from contextlib import contextmanager 15 from queue import Queue 16 from threading import Event, Thread 17 18 import requests 19 from mozbuild.generated_sources import ( 20 get_filename_with_digest, 21 get_s3_region_and_bucket, 22 ) 23 from requests.packages.urllib3.util.retry import Retry 24 25 # Arbitrary, should probably measure this. 26 NUM_WORKER_THREADS = 10 27 log = logging.getLogger("upload-generated-sources") 28 log.setLevel(logging.INFO) 29 30 31 @contextmanager 32 def timed(): 33 """ 34 Yield a function that provides the elapsed time in seconds since this 35 function was called. 36 """ 37 start = time.time() 38 39 def elapsed(): 40 return time.time() - start 41 42 yield elapsed 43 44 45 def gzip_compress(data): 46 """ 47 Apply gzip compression to `data` and return the result as a `BytesIO`. 48 """ 49 b = io.BytesIO() 50 with gzip.GzipFile(fileobj=b, mode="w") as f: 51 f.write(data) 52 b.flush() 53 b.seek(0) 54 return b 55 56 57 def upload_worker(queue, event, bucket, session_args): 58 """ 59 Get `(name, contents)` entries from `queue` and upload `contents` 60 to S3 with gzip compression using `name` as the key, prefixed with 61 the SHA-512 digest of `contents` as a hex string. If an exception occurs, 62 set `event`. 63 """ 64 try: 65 import boto3 66 67 session = boto3.session.Session(**session_args) 68 s3 = session.client("s3") 69 while True: 70 if event.is_set(): 71 # Some other thread hit an exception. 72 return 73 (name, contents) = queue.get() 74 pathname = get_filename_with_digest(name, contents) 75 compressed = gzip_compress(contents) 76 extra_args = { 77 "ContentEncoding": "gzip", 78 "ContentType": "text/plain", 79 } 80 log.info(f'Uploading "{pathname}" ({len(compressed.getvalue())} bytes)') 81 with timed() as elapsed: 82 s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args) 83 log.info(f'Finished uploading "{pathname}" in {elapsed():0.3f}s') 84 queue.task_done() 85 except Exception: 86 log.exception("Thread encountered exception:") 87 event.set() 88 89 90 def do_work(artifact, region, bucket): 91 session_args = {"region_name": region} 92 session = requests.Session() 93 retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) 94 http_adapter = requests.adapters.HTTPAdapter(max_retries=retry) 95 session.mount("https://", http_adapter) 96 session.mount("http://", http_adapter) 97 98 if "TASK_ID" in os.environ: 99 level = os.environ.get("MOZ_SCM_LEVEL", "1") 100 secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa 101 level 102 ) 103 log.info(f'Using AWS credentials from the secrets service: "{secrets_url}"') 104 res = session.get(secrets_url) 105 res.raise_for_status() 106 secret = res.json() 107 session_args.update( 108 aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"], 109 aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"], 110 ) 111 else: 112 log.info("Trying to use your AWS credentials..") 113 114 # First, fetch the artifact containing the sources. 115 log.info(f'Fetching generated sources artifact: "{artifact}"') 116 with timed() as elapsed: 117 res = session.get(artifact) 118 log.info( 119 f"Fetch HTTP status: {res.status_code}, {len(res.content)} bytes downloaded in {elapsed():0.3f}s" 120 ) 121 res.raise_for_status() 122 # Create a queue and worker threads for uploading. 123 q = Queue() 124 event = Event() 125 log.info(f"Creating {NUM_WORKER_THREADS} worker threads") 126 for i in range(NUM_WORKER_THREADS): 127 t = Thread(target=upload_worker, args=(q, event, bucket, session_args)) 128 t.daemon = True 129 t.start() 130 with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar: 131 # Next, process each file. 132 for entry in tar: 133 if event.is_set(): 134 break 135 log.info(f'Queueing "{entry.name}"') 136 q.put((entry.name, tar.extractfile(entry).read())) 137 # Wait until all uploads are finished. 138 # We don't use q.join() here because we want to also monitor event. 139 while q.unfinished_tasks: 140 if event.wait(0.1): 141 log.error("Worker thread encountered exception, exiting...") 142 sys.exit(1) 143 144 145 def main(argv): 146 logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s") 147 parser = argparse.ArgumentParser( 148 description="Upload generated source files in ARTIFACT to BUCKET in S3." 149 ) 150 parser.add_argument("artifact", help="generated-sources artifact from build task") 151 args = parser.parse_args(argv) 152 region, bucket = get_s3_region_and_bucket() 153 154 with timed() as elapsed: 155 do_work(region=region, bucket=bucket, artifact=args.artifact) 156 log.info(f"Finished in {elapsed():.03f}s") 157 return 0 158 159 160 if __name__ == "__main__": 161 sys.exit(main(sys.argv[1:]))