tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

cli.py (4789B)


      1 #!/usr/bin/env python
      2 # This Source Code Form is subject to the terms of the Mozilla Public
      3 # License, v. 2.0. If a copy of the MPL was not distributed with this
      4 # file, You can obtain one at https://mozilla.org/MPL/2.0/.
      5 
      6 import argparse
      7 import hashlib
      8 import os
      9 import threading
     10 
     11 import frida
     12 
     13 ARGUMENTS = {}
     14 SOCKETS = {}
     15 TERMINATE = threading.Condition()
     16 
     17 
     18 def store_for_target(target, data):
     19    filename = hashlib.sha1(data).hexdigest()
     20    directory = os.path.join(ARGUMENTS["output"], target)
     21 
     22    os.makedirs(directory, exist_ok=True)
     23 
     24    with open(os.path.join(directory, filename), "wb") as f:
     25        f.write(data)
     26 
     27 
     28 # --- asn1 ---
     29 
     30 
     31 def on_SEC_ASN1DecodeItem_Util(payload):
     32    if not "data" in payload:
     33        return
     34 
     35    store_for_target("asn1", bytes(payload["data"].values()))
     36 
     37 
     38 # --- certDN ---
     39 
     40 
     41 def on_CERT_AsciiToName(payload):
     42    if not "data" in payload:
     43        return
     44 
     45    store_for_target("certDN", payload["data"].encode())
     46 
     47 
     48 # --- ech ----
     49 
     50 
     51 def on_tls13_DecodeEchConfigs(payload):
     52    if not "data" in payload:
     53        return
     54 
     55    store_for_target("ech", bytes(payload["data"].values()))
     56 
     57 
     58 # --- pkcs7 ---
     59 
     60 
     61 def on_CERT_DecodeCertPackage(payload):
     62    if not "data" in payload:
     63        return
     64 
     65    store_for_target("pkcs7", bytes(payload["data"].values()))
     66 
     67 
     68 # --- pkcs8 ---
     69 
     70 
     71 def on_PK11_ImportDERPrivateKeyInfoAndReturnKey(payload):
     72    if not "data" in payload:
     73        return
     74 
     75    store_for_target("pkcs8", bytes(payload["data"].values()))
     76 
     77 
     78 # --- pkcs12 ---
     79 
     80 
     81 def on_SEC_PKCS12DecoderUpdate(payload):
     82    if not "data" in payload:
     83        return
     84 
     85    store_for_target("pkcs12", bytes(payload["data"].values()))
     86 
     87 
     88 # --- quickder ---
     89 
     90 
     91 def on_SEC_QuickDERDecodeItem_Util(payload):
     92    if not "data" in payload:
     93        return
     94 
     95    store_for_target("quickder", bytes(payload["data"].values()))
     96 
     97 
     98 # --- smime ---
     99 
    100 
    101 def on_NSS_CMSDecoder_Update(payload):
    102    if not "data" in payload:
    103        return
    104 
    105    store_for_target("smime", bytes(payload["data"].values()))
    106 
    107 
    108 # --- TLS ---
    109 
    110 
    111 def on_ssl_DefClose(payload):
    112    ss = payload["ss"]
    113    if not ss in SOCKETS:
    114        return
    115 
    116    # There is no way for us to determine (in a clean and future-proof)
    117    # way the variant (DTLS/TLS) and origin (client/server) of the
    118    # received data.
    119    # Since you want to minimize the corpus anyway, we just say it belongs
    120    # to all possible targets.
    121    data = SOCKETS[ss].lstrip(b"\x00")
    122    store_for_target("tls-client", data)
    123 
    124    base_output_path = os.path.abspath(ARGUMENTS["output"])
    125    for target in ["dtls-client", "dtls-server", "tls-client", "tls-server"]:
    126        if not os.path.exists(os.path.join(base_output_path, target)):
    127            os.symlink(os.path.join(base_output_path, "tls-client"),
    128                       os.path.join(base_output_path, target),
    129                       target_is_directory=True)
    130 
    131    del SOCKETS[ss]
    132 
    133 
    134 def on_ssl_DefRecv(payload):
    135    if not "data" in payload:
    136        return
    137 
    138    ss = payload["ss"]
    139    if not ss in SOCKETS:
    140        SOCKETS[ss] = bytes()
    141 
    142    SOCKETS[ss] += bytes(payload["data"].values())
    143 
    144 
    145 def on_ssl_DefRead(payload):
    146    if not "data" in payload:
    147        return
    148 
    149    ss = payload["ss"]
    150    if not ss in SOCKETS:
    151        SOCKETS[ss] = bytes()
    152 
    153    SOCKETS[ss] += bytes(payload["data"].values())
    154 
    155 
    156 def script_on_message(message, _data):
    157    if message["type"] != "send":
    158        print(message)
    159        return
    160 
    161    assert message["type"] == "send"
    162 
    163    payload = message["payload"]
    164    func = "on_" + payload["func"]
    165 
    166    assert func in globals()
    167    globals()[func](payload)
    168 
    169 
    170 def session_on_detached():
    171    with TERMINATE:
    172        TERMINATE.notify_all()
    173 
    174 
    175 def main():
    176    parser = argparse.ArgumentParser()
    177    parser.add_argument("--script", required=True, type=str)
    178    parser.add_argument("--nss-build",
    179                        required=True,
    180                        type=str,
    181                        help="e.g. /path/to/dist/Debug")
    182    parser.add_argument("--program", required=True, type=str)
    183    parser.add_argument("--output", required=True, type=str)
    184 
    185    args, programargs = parser.parse_known_args()
    186 
    187    global ARGUMENTS
    188    ARGUMENTS = vars(args)
    189 
    190    with open(args.script, "r") as f:
    191        script = f.read()
    192 
    193    pid = frida.spawn(program=args.program,
    194                      argv=programargs,
    195                      env={
    196                          **os.environ, "LD_LIBRARY_PATH":
    197                          os.path.join(args.nss_build, "lib")
    198                      })
    199    session = frida.attach(pid)
    200 
    201    script = session.create_script(script)
    202    script.load()
    203 
    204    script.on("message", script_on_message)
    205 
    206    session.resume()
    207    frida.resume(pid)
    208 
    209    session.on("detached", session_on_detached)
    210 
    211    with TERMINATE:
    212        TERMINATE.wait()
    213 
    214 
    215 if __name__ == "__main__":
    216    main()