test_functional.py (2277B)
1 # mypy: allow-untyped-defs 2 3 try: 4 from importlib import reload 5 except ImportError: 6 pass 7 import json 8 import os 9 import queue 10 import tempfile 11 import threading 12 13 import pytest 14 15 from . import serve 16 from wptserve import logger 17 18 19 class ServerProcSpy(serve.ServerProc): 20 instances = None 21 22 def start(self, *args, **kwargs): 23 result = super().start(*args, **kwargs) 24 25 if ServerProcSpy.instances is not None: 26 ServerProcSpy.instances.put(self) 27 28 return result 29 30 31 serve.ServerProc = ServerProcSpy # type: ignore 32 33 34 @pytest.fixture() 35 def server_subprocesses(): 36 ServerProcSpy.instances = queue.Queue() 37 yield ServerProcSpy.instances 38 ServerProcSpy.instances = None 39 40 41 @pytest.fixture() 42 def tempfile_name(): 43 fd, name = tempfile.mkstemp() 44 yield name 45 os.close(fd) 46 os.remove(name) 47 48 49 def test_subprocess_exit(server_subprocesses, tempfile_name): 50 timeout = 30 51 52 def target(): 53 # By default, the server initially creates a child process to validate 54 # local system configuration. That process is unrelated to the behavior 55 # under test, but at the time of this writing, the parent uses the same 56 # constructor that is also used to create the long-running processes 57 # which are relevant to this functionality. Disable the check so that 58 # the constructor is only used to create relevant processes. 59 config = { 60 "browser_host": "localhost", 61 "alternate_hosts": {"alt": "127.0.0.1"}, 62 "check_subdomains": False, 63 } 64 with open(tempfile_name, "w") as handle: 65 json.dump(config, handle) 66 67 # The `logger` module from the wptserver package uses a singleton 68 # pattern which resists testing. In order to avoid conflicting with 69 # other tests which rely on that module, pre-existing state is 70 # discarded through an explicit "reload" operation. 71 reload(logger) 72 73 serve.run(config_path=tempfile_name) 74 75 thread = threading.Thread(target=target) 76 77 thread.start() 78 79 server_subprocesses.get(True, timeout) 80 subprocess = server_subprocesses.get(True, timeout) 81 subprocess.request_shutdown() 82 subprocess.wait() 83 84 thread.join(timeout) 85 86 assert not thread.is_alive()