tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

_module.py (5381B)


      1 import asyncio
      2 import functools
      3 from typing import (
      4    Any,
      5    Callable,
      6    Optional,
      7    Mapping,
      8    MutableMapping,
      9    TYPE_CHECKING,
     10 )
     11 
     12 from ..undefined import UNDEFINED
     13 
     14 if TYPE_CHECKING:
     15    from ..client import BidiSession
     16 
     17 
     18 class command:
     19    """Decorator for implementing bidi commands.
     20 
     21    Implementing a command involves specifying an async function that
     22    builds the parameters to the command. The decorator arranges those
     23    parameters to be turned into a send_command call, using the class
     24    and method names to determine the method in the call.
     25 
     26    Commands decorated in this way don't return a future, but await
     27    the actual response. In some cases it can be useful to
     28    post-process this response before returning it to the client. This
     29    can be done by specifying a second decorated method like
     30    @command_name.result. That method will then be called once the
     31    result of the original command is known, and the return value of
     32    the method used as the response of the command. If this method
     33    is specified, the `raw_result` parameter of the command can be set
     34    to `True` to get the result without post-processing.
     35 
     36    So for an example, if we had a command test.testMethod, which
     37    returned a result which we want to convert to a TestResult type,
     38    the implementation might look like:
     39 
     40    class Test(BidiModule):
     41        @command
     42        def test_method(self, test_data=None):
     43            return {"testData": test_data}
     44 
     45       @test_method.result
     46       def convert_test_method_result(self, result):
     47           return TestData(**result)
     48    """
     49 
     50    def __init__(self, fn: Callable[..., Mapping[str, Any]]):
     51        self.params_fn = fn
     52        self.result_fn: Optional[Callable[..., Any]] = None
     53 
     54    def result(self, fn: Callable[[Any, MutableMapping[str, Any]],
     55                                  Any]) -> None:
     56        self.result_fn = fn
     57 
     58    def __set_name__(self, owner: Any, name: str) -> None:
     59        # This is called when the class is created
     60        # see https://docs.python.org/3/reference/datamodel.html#object.__set_name__
     61        params_fn = self.params_fn
     62        result_fn = self.result_fn
     63 
     64        @functools.wraps(params_fn)
     65        async def inner(self: Any, **kwargs: Any) -> Any:
     66            raw_result = kwargs.pop("raw_result", False)
     67            extension_params = kwargs.pop("_extension_params", {})
     68 
     69            params = remove_undefined(params_fn(self, **kwargs))
     70 
     71            # Convert the classname and the method name to a bidi command name
     72            mod_name = owner.__name__[0].lower() + owner.__name__[1:]
     73            if hasattr(owner, "prefix"):
     74                mod_name = f"{owner.prefix}:{mod_name}"
     75            cmd_name = f"{mod_name}.{to_camelcase(name)}"
     76 
     77            # Verify specified vendor parameters
     78            for key in extension_params:
     79                if ":" not in key:
     80                    raise ValueError(f"Extension parameter '{key}' misses prefix.")
     81 
     82            # Merge into params (vendor keys win if duplicates)
     83            params.update(extension_params)
     84 
     85            future = await self.session.send_command(cmd_name, params)
     86            result = await future
     87 
     88            if result_fn is not None and not raw_result:
     89                # Convert the result if we have a conversion function defined
     90                if asyncio.iscoroutinefunction(result_fn):
     91                    result = await result_fn(self, result)
     92                else:
     93                    result = result_fn(self, result)
     94            return result
     95 
     96        # Overwrite the method on the owner class with the wrapper
     97        setattr(owner, name, inner)
     98 
     99 
    100 class BidiModule:
    101 
    102    def __init__(self, session: "BidiSession"):
    103        self.session = session
    104 
    105 
    106 def to_camelcase(name: str) -> str:
    107    """Convert a python style method name foo_bar to a BiDi command name fooBar"""
    108    parts = name.split("_")
    109    parts[0] = parts[0].lower()
    110    for i in range(1, len(parts)):
    111        parts[i] = parts[i].title()
    112    return "".join(parts)
    113 
    114 
    115 def remove_undefined(obj: Any) -> Any:
    116    """
    117    Removes entries from a dictionary where the value is UNDEFINED. Also removes
    118    UNDEFINED values from lists. Recursively processes nested dictionaries and
    119    lists.
    120 
    121    >>> from ..undefined import UNDEFINED
    122    >>> remove_undefined({"a": 1, "b": UNDEFINED, "c": 3})
    123    {'a': 1, 'c': 3}
    124 
    125    >>> remove_undefined({"a": 1, "b": {"x": UNDEFINED, "y": 2}, "c": UNDEFINED})
    126    {'a': 1, 'b': {'y': 2}}
    127 
    128    >>> remove_undefined({"a": 1, "b": [1, UNDEFINED, 3], "c": UNDEFINED})
    129    {'a': 1, 'b': [1, 3]}
    130 
    131    >>> remove_undefined({"a": 1, "b": [{"x": UNDEFINED, "y": 2}], "c": UNDEFINED})
    132    {'a': 1, 'b': [{'y': 2}]}
    133 
    134    >>> remove_undefined({"a": UNDEFINED, "b": {"x": UNDEFINED}})
    135    {'b': {}}
    136 
    137    >>> remove_undefined({})
    138    {}
    139 
    140    >>> remove_undefined([])
    141    []
    142 
    143    >>> remove_undefined(1)
    144    1
    145 
    146    >>> remove_undefined("foo")
    147    'foo'
    148 
    149    >>> remove_undefined(None)
    150 
    151    """
    152    if isinstance(obj, Mapping):
    153        new_obj = {}
    154        for key, value in obj.items():
    155            if value is not UNDEFINED:
    156                new_obj[key] = remove_undefined(value)
    157        return new_obj
    158    elif isinstance(obj, list):
    159        new_list = []
    160        for item in obj:
    161            if item is not UNDEFINED:
    162                new_list.append(remove_undefined(item))
    163        return new_list
    164    return obj