helpers.py (10058B)
1 import argparse 2 import json 3 import os 4 import re 5 import socket 6 import subprocess 7 import sys 8 import tempfile 9 import threading 10 import time 11 from contextlib import suppress 12 from copy import deepcopy 13 from urllib.parse import urlparse 14 15 import webdriver 16 from mozprofile import Preferences, Profile 17 from mozrunner import FirefoxRunner 18 19 from .context import using_context 20 21 22 class Browser: 23 def __init__( 24 self, 25 binary, 26 profile, 27 env=None, 28 extra_args=None, 29 extra_prefs=None, 30 log_level=None, 31 truncate_enabled=True, 32 use_bidi=False, 33 use_marionette=False, 34 ): 35 self.profile = profile 36 37 self.extra_args = extra_args 38 self.extra_prefs = extra_prefs 39 self.log_level = log_level 40 self.truncate_enabled = truncate_enabled 41 self.use_bidi = use_bidi 42 self.use_marionette = use_marionette 43 44 self.bidi_port_file = None 45 self.remote_agent_host = None 46 self.remote_agent_port = None 47 48 cmdargs = ["-no-remote"] 49 50 if use_bidi: 51 self.webdriver_bidi_file = os.path.join( 52 self.profile.profile, "WebDriverBiDiServer.json" 53 ) 54 with suppress(FileNotFoundError): 55 os.remove(self.webdriver_bidi_file) 56 57 if use_marionette: 58 cmdargs.extend(["-marionette"]) 59 60 # Avoid modifying extra_prefs to prevent side-effects with the "browser" fixture, 61 # which checks session equality and would create a new session each time. 62 prefs = self.extra_prefs or {} 63 64 if log_level is not None: 65 prefs.update({"remote.log.level": log_level}) 66 67 if truncate_enabled is False: 68 prefs.update({"remote.log.truncate": False}) 69 70 self.profile.set_preferences(prefs) 71 72 if self.use_bidi: 73 cmdargs.extend(["--remote-debugging-port", "0"]) 74 if self.extra_args is not None: 75 cmdargs.extend(self.extra_args) 76 77 print(f"Run command: {binary} {cmdargs}") 78 self.runner = FirefoxRunner( 79 binary=binary, profile=self.profile, cmdargs=cmdargs, env=env 80 ) 81 82 @property 83 def websocket_url(self): 84 if self.remote_agent_host is None or self.remote_agent_port is None: 85 raise Exception("No WebSocket server running") 86 87 return f"ws://{self.remote_agent_host}:{self.remote_agent_port}" 88 89 @property 90 def is_running(self): 91 return self.runner.is_running() 92 93 def start(self): 94 # Start Firefox. 95 self.runner.start() 96 97 if self.use_bidi: 98 # Wait until the WebDriverBiDiServer.json file is ready 99 while not os.path.exists(self.webdriver_bidi_file): 100 time.sleep(0.1) 101 102 # Read the connection details from file 103 data = json.loads(open(self.webdriver_bidi_file).read()) 104 self.remote_agent_host = data["ws_host"] 105 self.remote_agent_port = int(data["ws_port"]) 106 107 def quit(self, clean_profile=True): 108 if self.is_running: 109 self.runner.stop() 110 self.runner.cleanup() 111 112 if clean_profile: 113 self.profile.cleanup() 114 115 def wait(self): 116 if self.is_running is True: 117 self.runner.wait() 118 119 120 class Geckodriver: 121 PORT_RE = re.compile(rb".*Listening on [^ :]*:(\d+)") 122 123 def __init__(self, configuration, hostname=None, extra_args=None, extra_env=None): 124 self.config = configuration["webdriver"] 125 self.requested_capabilities = configuration["capabilities"] 126 self.hostname = hostname or configuration["host"] 127 self.extra_args = extra_args or [] 128 self.env = configuration["browser"]["env"] 129 self.extra_env = extra_env or {} 130 131 self.command = None 132 self.proc = None 133 self.port = None 134 self.reader_thread = None 135 136 self.capabilities = {"alwaysMatch": self.requested_capabilities} 137 self.session = None 138 139 @property 140 def remote_agent_port(self): 141 webSocketUrl = self.session.capabilities.get("webSocketUrl") 142 assert webSocketUrl is not None 143 144 return urlparse(webSocketUrl).port 145 146 def start(self): 147 self.command = ( 148 [self.config["binary"], "--port", "0"] 149 + self.config["args"] 150 + self.extra_args 151 ) 152 153 print(f"Running command: {' '.join(self.command)}") 154 all_env = deepcopy(self.env) 155 all_env.update(self.extra_env) 156 157 self.proc = subprocess.Popen(self.command, env=all_env, stdout=subprocess.PIPE) 158 159 self.reader_thread = threading.Thread( 160 target=readOutputLine, 161 args=(self.proc.stdout, self.processOutputLine), 162 daemon=True, 163 ) 164 self.reader_thread.start() 165 # Wait for the port to become ready 166 end_time = time.time() + 10 167 while time.time() < end_time: 168 returncode = self.proc.poll() 169 if returncode is not None: 170 raise ChildProcessError( 171 f"geckodriver terminated with code {returncode}" 172 ) 173 if self.port is not None: 174 with socket.socket() as sock: 175 if sock.connect_ex((self.hostname, self.port)) == 0: 176 break 177 else: 178 time.sleep(0.1) 179 else: 180 if self.port is None: 181 raise OSError( 182 f"Failed to read geckodriver port started on {self.hostname}" 183 ) 184 raise ConnectionRefusedError( 185 f"Failed to connect to geckodriver on {self.hostname}:{self.port}" 186 ) 187 188 self.session = webdriver.Session( 189 self.hostname, self.port, capabilities=self.capabilities 190 ) 191 192 return self 193 194 def processOutputLine(self, line): 195 sys.stdout.write(line) 196 197 if self.port is None: 198 m = self.PORT_RE.match(line) 199 if m is not None: 200 self.port = int(m.groups()[0]) 201 202 async def stop(self): 203 if self.session is not None: 204 await self.delete_session() 205 if self.proc: 206 self.proc.kill() 207 self.port = None 208 if self.reader_thread is not None: 209 self.reader_thread.join() 210 211 def new_session(self): 212 self.session.start() 213 214 async def delete_session(self): 215 if self.session.bidi_session is not None: 216 await self.session.bidi_session.end() 217 218 self.session.end() 219 220 221 def clear_pref(session, pref): 222 """Clear the user-defined value from the specified preference. 223 224 :param pref: Name of the preference. 225 """ 226 with using_context(session, "chrome"): 227 session.execute_script( 228 """ 229 const { Preferences } = ChromeUtils.importESModule( 230 "resource://gre/modules/Preferences.sys.mjs" 231 ); 232 Preferences.reset(arguments[0]); 233 """, 234 args=(pref,), 235 ) 236 237 238 def create_custom_profile(base_profile, default_preferences, clone=True): 239 if clone: 240 # Clone the current profile and remove the prefs.js file to only 241 # keep default preferences as set in user.js. 242 profile = Profile.clone(base_profile) 243 prefs_path = os.path.join(profile.profile, "prefs.js") 244 if os.path.exists(prefs_path): 245 os.remove(prefs_path) 246 else: 247 profile = Profile(tempfile.mkdtemp(prefix="wdspec-")) 248 profile.set_preferences(default_preferences) 249 250 return profile 251 252 253 def get_arg_value(arg_names, args): 254 """Get an argument value from a list of arguments 255 256 This assumes that argparse argument parsing is close enough to the target 257 to be compatible, at least with the set of inputs we have. 258 259 :param arg_names: - List of names for the argument e.g. ["--foo", "-f"] 260 :param args: - List of arguments to parse 261 :returns: - Optional string argument value 262 """ 263 parser = argparse.ArgumentParser() 264 parser.add_argument(*arg_names, action="store", dest="value", default=None) 265 parsed, _ = parser.parse_known_args(args) 266 return parsed.value 267 268 269 def get_pref(session, pref): 270 """Get the value of the specified preference. 271 272 :param pref: Name of the preference. 273 """ 274 with using_context(session, "chrome"): 275 pref_value = session.execute_script( 276 """ 277 const { Preferences } = ChromeUtils.importESModule( 278 "resource://gre/modules/Preferences.sys.mjs" 279 ); 280 281 let pref = arguments[0]; 282 283 prefs = new Preferences(); 284 return prefs.get(pref, null); 285 """, 286 args=(pref,), 287 ) 288 return pref_value 289 290 291 def get_profile_folder(firefox_options): 292 return get_arg_value(["--profile"], firefox_options["args"]) 293 294 295 def readOutputLine(stream, callback): 296 while True: 297 line = stream.readline() 298 if not line: 299 break 300 301 callback(line) 302 303 304 def read_user_preferences(profile_path, filename="user.js"): 305 prefs_file = os.path.join(profile_path, filename) 306 307 prefs = {} 308 for pref_name, pref_value in Preferences().read_prefs(prefs_file): 309 prefs[pref_name] = pref_value 310 311 return prefs 312 313 314 def set_pref(session, pref, value): 315 """Set the value of the specified preference. 316 317 :param pref: Name of the preference. 318 :param value: The value to set the preference to. If the value is None, 319 reset the preference to its default value. If no default 320 value exists, the preference will cease to exist. 321 """ 322 if value is None: 323 clear_pref(session, pref) 324 return 325 326 with using_context(session, "chrome"): 327 session.execute_script( 328 """ 329 const { Preferences } = ChromeUtils.importESModule( 330 "resource://gre/modules/Preferences.sys.mjs" 331 ); 332 333 const [pref, value] = arguments; 334 335 prefs = new Preferences(); 336 prefs.set(pref, value); 337 """, 338 args=(pref, value), 339 )