tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

upload_generated_sources.py (5409B)


      1 #!/usr/bin/env/python
      2 # This Source Code Form is subject to the terms of the Mozilla Public
      3 # License, v. 2.0. If a copy of the MPL was not distributed with this
      4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      5 
      6 import argparse
      7 import gzip
      8 import io
      9 import logging
     10 import os
     11 import sys
     12 import tarfile
     13 import time
     14 from contextlib import contextmanager
     15 from queue import Queue
     16 from threading import Event, Thread
     17 
     18 import requests
     19 from mozbuild.generated_sources import (
     20    get_filename_with_digest,
     21    get_s3_region_and_bucket,
     22 )
     23 from requests.packages.urllib3.util.retry import Retry
     24 
     25 # Arbitrary, should probably measure this.
     26 NUM_WORKER_THREADS = 10
     27 log = logging.getLogger("upload-generated-sources")
     28 log.setLevel(logging.INFO)
     29 
     30 
     31 @contextmanager
     32 def timed():
     33    """
     34    Yield a function that provides the elapsed time in seconds since this
     35    function was called.
     36    """
     37    start = time.time()
     38 
     39    def elapsed():
     40        return time.time() - start
     41 
     42    yield elapsed
     43 
     44 
     45 def gzip_compress(data):
     46    """
     47    Apply gzip compression to `data` and return the result as a `BytesIO`.
     48    """
     49    b = io.BytesIO()
     50    with gzip.GzipFile(fileobj=b, mode="w") as f:
     51        f.write(data)
     52    b.flush()
     53    b.seek(0)
     54    return b
     55 
     56 
     57 def upload_worker(queue, event, bucket, session_args):
     58    """
     59    Get `(name, contents)` entries from `queue` and upload `contents`
     60    to S3 with gzip compression using `name` as the key, prefixed with
     61    the SHA-512 digest of `contents` as a hex string. If an exception occurs,
     62    set `event`.
     63    """
     64    try:
     65        import boto3
     66 
     67        session = boto3.session.Session(**session_args)
     68        s3 = session.client("s3")
     69        while True:
     70            if event.is_set():
     71                # Some other thread hit an exception.
     72                return
     73            (name, contents) = queue.get()
     74            pathname = get_filename_with_digest(name, contents)
     75            compressed = gzip_compress(contents)
     76            extra_args = {
     77                "ContentEncoding": "gzip",
     78                "ContentType": "text/plain",
     79            }
     80            log.info(f'Uploading "{pathname}" ({len(compressed.getvalue())} bytes)')
     81            with timed() as elapsed:
     82                s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
     83                log.info(f'Finished uploading "{pathname}" in {elapsed():0.3f}s')
     84            queue.task_done()
     85    except Exception:
     86        log.exception("Thread encountered exception:")
     87        event.set()
     88 
     89 
     90 def do_work(artifact, region, bucket):
     91    session_args = {"region_name": region}
     92    session = requests.Session()
     93    retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
     94    http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
     95    session.mount("https://", http_adapter)
     96    session.mount("http://", http_adapter)
     97 
     98    if "TASK_ID" in os.environ:
     99        level = os.environ.get("MOZ_SCM_LEVEL", "1")
    100        secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format(  # noqa
    101            level
    102        )
    103        log.info(f'Using AWS credentials from the secrets service: "{secrets_url}"')
    104        res = session.get(secrets_url)
    105        res.raise_for_status()
    106        secret = res.json()
    107        session_args.update(
    108            aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"],
    109            aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"],
    110        )
    111    else:
    112        log.info("Trying to use your AWS credentials..")
    113 
    114    # First, fetch the artifact containing the sources.
    115    log.info(f'Fetching generated sources artifact: "{artifact}"')
    116    with timed() as elapsed:
    117        res = session.get(artifact)
    118        log.info(
    119            f"Fetch HTTP status: {res.status_code}, {len(res.content)} bytes downloaded in {elapsed():0.3f}s"
    120        )
    121    res.raise_for_status()
    122    # Create a queue and worker threads for uploading.
    123    q = Queue()
    124    event = Event()
    125    log.info(f"Creating {NUM_WORKER_THREADS} worker threads")
    126    for i in range(NUM_WORKER_THREADS):
    127        t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
    128        t.daemon = True
    129        t.start()
    130    with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar:
    131        # Next, process each file.
    132        for entry in tar:
    133            if event.is_set():
    134                break
    135            log.info(f'Queueing "{entry.name}"')
    136            q.put((entry.name, tar.extractfile(entry).read()))
    137    # Wait until all uploads are finished.
    138    # We don't use q.join() here because we want to also monitor event.
    139    while q.unfinished_tasks:
    140        if event.wait(0.1):
    141            log.error("Worker thread encountered exception, exiting...")
    142            sys.exit(1)
    143 
    144 
    145 def main(argv):
    146    logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
    147    parser = argparse.ArgumentParser(
    148        description="Upload generated source files in ARTIFACT to BUCKET in S3."
    149    )
    150    parser.add_argument("artifact", help="generated-sources artifact from build task")
    151    args = parser.parse_args(argv)
    152    region, bucket = get_s3_region_and_bucket()
    153 
    154    with timed() as elapsed:
    155        do_work(region=region, bucket=bucket, artifact=args.artifact)
    156        log.info(f"Finished in {elapsed():.03f}s")
    157    return 0
    158 
    159 
    160 if __name__ == "__main__":
    161    sys.exit(main(sys.argv[1:]))