tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

unittest.py (6683B)


      1 # Any copyright is dedicated to the Public Domain.
      2 # http://creativecommons.org/publicdomain/zero/1.0/
      3 
      4 # Unit test for the HttpPostFile plugin, using a server on localhost.
      5 #
      6 # This test has not been set up to run in continuous integration. It is
      7 # intended to be run manually, and only on Windows.
      8 #
      9 # Requires postdriver.exe, which can be built from postdriver.nsi with makensis
     10 # from MozillaBuild:
     11 #
     12 #   makensis-3.01.exe postdriver.nsi
     13 #
     14 # It can then be run from this directory as:
     15 #
     16 #   python3 test.py
     17 
     18 import os
     19 import subprocess
     20 import http.server
     21 import socketserver
     22 import threading
     23 
     24 DRIVER_EXE_FILE_NAME = "postdriver.exe"
     25 JSON_FILE_NAME = "test1.json"
     26 RESULT_FILE_NAME = "result.txt"
     27 BIND_HOST = "127.0.0.1"
     28 BIND_PORT = 8080
     29 COMMON_URL = f"http://{BIND_HOST}:{BIND_PORT}/submit"
     30 COMMON_JSON_BYTES = '{"yes": "indeed",\n"and": "ij"}'.encode('utf-8')
     31 
     32 DRIVER_TIMEOUT_SECS = 60
     33 SERVER_TIMEOUT_SECS = 120
     34 
     35 
     36 class PostHandler(http.server.BaseHTTPRequestHandler):
     37    """BaseHTTPRequestHandler, basically just here to have a configurable do_POST handler"""
     38 
     39 
     40 last_submission = None
     41 last_content_type = None
     42 server_response = 'Hello, plugin'.encode('utf-8')
     43 
     44 
     45 def server_accept_submit(handler):
     46    """Plugs into PostHandler.do_POST, accepts a POST on /submit and saves it into
     47    the globals"""
     48 
     49    global last_submission
     50    global last_content_type
     51    global server_response
     52 
     53    last_submission = None
     54    last_content_type = None
     55 
     56    if handler.path == "/submit":
     57        handler.send_response(200, 'Ok')
     58        content_length = int(handler.headers['Content-Length'])
     59        last_submission = handler.rfile.read(content_length)
     60        last_content_type = handler.headers['Content-Type']
     61    else:
     62        handler.send_response(404, 'Not found')
     63    handler.end_headers()
     64 
     65    handler.wfile.write(server_response)
     66    handler.wfile.flush()
     67 
     68    handler.log_message("sent response")
     69 
     70 
     71 server_hang_event = None
     72 
     73 
     74 def server_hang(handler):
     75    """Plugs into PostHandler.do_POST, waits on server_hang_event or until timeout"""
     76    server_hang_event.wait(SERVER_TIMEOUT_SECS)
     77 
     78 
     79 def run_and_assert_result(handle_request, post_file, url, expected_result):
     80    """Sets up the server on another thread, runs the NSIS driver, and checks the result"""
     81    global last_submission
     82    global server_hang_event
     83 
     84    try:
     85        os.remove(RESULT_FILE_NAME)
     86    except FileNotFoundError:
     87        pass
     88 
     89    PostHandler.do_POST = handle_request
     90    last_submission = None
     91 
     92    def handler_thread():
     93        with socketserver.TCPServer((BIND_HOST, BIND_PORT), PostHandler) as httpd:
     94            httpd.timeout = SERVER_TIMEOUT_SECS
     95            httpd.handle_request()
     96 
     97    if handle_request:
     98        server_thread = threading.Thread(target=handler_thread)
     99        server_thread.start()
    100 
    101    try:
    102        subprocess.call([DRIVER_EXE_FILE_NAME, f'/postfile={post_file}', f'/url={url}',
    103                         f'/resultfile={RESULT_FILE_NAME}', '/S'], timeout=DRIVER_TIMEOUT_SECS)
    104 
    105        with open(RESULT_FILE_NAME, "r") as result_file:
    106            result = result_file.read()
    107 
    108            if result != expected_result:
    109                raise AssertionError(f'{result} != {expected_result}')
    110 
    111    finally:
    112        if server_hang_event:
    113            server_hang_event.set()
    114 
    115        if handle_request:
    116            server_thread.join()
    117        os.remove(RESULT_FILE_NAME)
    118 
    119 
    120 def create_json_file(json_bytes=COMMON_JSON_BYTES):
    121    with open(JSON_FILE_NAME, "wb") as outfile:
    122        outfile.write(json_bytes)
    123 
    124 
    125 def check_submission(json_bytes=COMMON_JSON_BYTES):
    126    if last_submission != json_bytes:
    127        raise AssertionError(f'{last_submission.hex()} != {COMMON_JSON_BYTES}')
    128 
    129 
    130 def cleanup_json_file():
    131    os.remove(JSON_FILE_NAME)
    132 
    133 
    134 # Tests begin here
    135 
    136 try:
    137    cleanup_json_file()
    138 except FileNotFoundError:
    139    pass
    140 
    141 # Basic test
    142 
    143 create_json_file()
    144 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, COMMON_URL, "success")
    145 check_submission()
    146 assert last_content_type == 'application/json'
    147 cleanup_json_file()
    148 
    149 print("Basic test OK\n")
    150 
    151 # Test with missing file
    152 
    153 try:
    154    cleanup_json_file()
    155 except FileNotFoundError:
    156    pass
    157 
    158 run_and_assert_result(None, JSON_FILE_NAME, COMMON_URL, "error reading file")
    159 
    160 print("Missing file test OK\n")
    161 
    162 # Test with empty file
    163 
    164 create_json_file(bytes())
    165 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, COMMON_URL, "success")
    166 check_submission(bytes())
    167 cleanup_json_file()
    168 
    169 print("Empty file test OK\n")
    170 
    171 # Test with large file
    172 
    173 # NOTE: Not actually JSON, but nothing here should care
    174 four_mbytes = bytes([x & 255 for x in range(4*1024*1024)])
    175 create_json_file(four_mbytes)
    176 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, COMMON_URL, "success")
    177 if last_submission != four_mbytes:
    178    raise AssertionError("large file mismatch")
    179 cleanup_json_file()
    180 
    181 print("Large file test OK\n")
    182 
    183 # Test with long file name
    184 
    185 # Test with bad URL
    186 
    187 bogus_url = "notAUrl"
    188 create_json_file()
    189 run_and_assert_result(None, JSON_FILE_NAME, bogus_url, "error parsing URL")
    190 cleanup_json_file()
    191 
    192 print("Bad URL test OK\n")
    193 
    194 # Test with empty response
    195 
    196 server_response = bytes()
    197 create_json_file()
    198 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, COMMON_URL, "success")
    199 check_submission()
    200 cleanup_json_file()
    201 
    202 print("Empty response test OK\n")
    203 
    204 # Test with large response
    205 
    206 server_response = four_mbytes
    207 create_json_file()
    208 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, COMMON_URL, "success")
    209 check_submission()
    210 cleanup_json_file()
    211 
    212 print("Large response test OK\n")
    213 
    214 # Test with 404
    215 # NOTE: This succeeds since the client doesn't check the status code
    216 
    217 create_json_file()
    218 nonexistent_url = f"http://{BIND_HOST}:{BIND_PORT}/bad"
    219 run_and_assert_result(server_accept_submit, JSON_FILE_NAME, nonexistent_url, "success")
    220 cleanup_json_file()
    221 
    222 print("404 response test OK\n")
    223 
    224 # Test with no server on the port
    225 # NOTE: I'm assuming nothing else has been able to bind to the port
    226 
    227 print("Running no server test, this will take a few seconds...")
    228 
    229 create_json_file()
    230 run_and_assert_result(None, JSON_FILE_NAME, COMMON_URL, "error sending HTTP request")
    231 cleanup_json_file()
    232 
    233 print("No server test OK\n")
    234 
    235 # Test with server that hangs on response
    236 # NOTE: HttpPostFile doesn't currently set the timeouts. Defaults seem to be around 30 seconds,
    237 # but if they end up being longer than the 60 second driver timeout then this will fail.
    238 
    239 print("Running server hang test, this will take up to a minute...")
    240 
    241 server_hang_event = threading.Event()
    242 create_json_file()
    243 run_and_assert_result(server_hang, JSON_FILE_NAME, COMMON_URL, "error sending HTTP request")
    244 cleanup_json_file()
    245 server_hang_event = None
    246 
    247 print("Server hang test OK\n")