cli.py (4789B)
1 #!/usr/bin/env python 2 # This Source Code Form is subject to the terms of the Mozilla Public 3 # License, v. 2.0. If a copy of the MPL was not distributed with this 4 # file, You can obtain one at https://mozilla.org/MPL/2.0/. 5 6 import argparse 7 import hashlib 8 import os 9 import threading 10 11 import frida 12 13 ARGUMENTS = {} 14 SOCKETS = {} 15 TERMINATE = threading.Condition() 16 17 18 def store_for_target(target, data): 19 filename = hashlib.sha1(data).hexdigest() 20 directory = os.path.join(ARGUMENTS["output"], target) 21 22 os.makedirs(directory, exist_ok=True) 23 24 with open(os.path.join(directory, filename), "wb") as f: 25 f.write(data) 26 27 28 # --- asn1 --- 29 30 31 def on_SEC_ASN1DecodeItem_Util(payload): 32 if not "data" in payload: 33 return 34 35 store_for_target("asn1", bytes(payload["data"].values())) 36 37 38 # --- certDN --- 39 40 41 def on_CERT_AsciiToName(payload): 42 if not "data" in payload: 43 return 44 45 store_for_target("certDN", payload["data"].encode()) 46 47 48 # --- ech ---- 49 50 51 def on_tls13_DecodeEchConfigs(payload): 52 if not "data" in payload: 53 return 54 55 store_for_target("ech", bytes(payload["data"].values())) 56 57 58 # --- pkcs7 --- 59 60 61 def on_CERT_DecodeCertPackage(payload): 62 if not "data" in payload: 63 return 64 65 store_for_target("pkcs7", bytes(payload["data"].values())) 66 67 68 # --- pkcs8 --- 69 70 71 def on_PK11_ImportDERPrivateKeyInfoAndReturnKey(payload): 72 if not "data" in payload: 73 return 74 75 store_for_target("pkcs8", bytes(payload["data"].values())) 76 77 78 # --- pkcs12 --- 79 80 81 def on_SEC_PKCS12DecoderUpdate(payload): 82 if not "data" in payload: 83 return 84 85 store_for_target("pkcs12", bytes(payload["data"].values())) 86 87 88 # --- quickder --- 89 90 91 def on_SEC_QuickDERDecodeItem_Util(payload): 92 if not "data" in payload: 93 return 94 95 store_for_target("quickder", bytes(payload["data"].values())) 96 97 98 # --- smime --- 99 100 101 def on_NSS_CMSDecoder_Update(payload): 102 if not "data" in payload: 103 return 104 105 store_for_target("smime", bytes(payload["data"].values())) 106 107 108 # --- TLS --- 109 110 111 def on_ssl_DefClose(payload): 112 ss = payload["ss"] 113 if not ss in SOCKETS: 114 return 115 116 # There is no way for us to determine (in a clean and future-proof) 117 # way the variant (DTLS/TLS) and origin (client/server) of the 118 # received data. 119 # Since you want to minimize the corpus anyway, we just say it belongs 120 # to all possible targets. 121 data = SOCKETS[ss].lstrip(b"\x00") 122 store_for_target("tls-client", data) 123 124 base_output_path = os.path.abspath(ARGUMENTS["output"]) 125 for target in ["dtls-client", "dtls-server", "tls-client", "tls-server"]: 126 if not os.path.exists(os.path.join(base_output_path, target)): 127 os.symlink(os.path.join(base_output_path, "tls-client"), 128 os.path.join(base_output_path, target), 129 target_is_directory=True) 130 131 del SOCKETS[ss] 132 133 134 def on_ssl_DefRecv(payload): 135 if not "data" in payload: 136 return 137 138 ss = payload["ss"] 139 if not ss in SOCKETS: 140 SOCKETS[ss] = bytes() 141 142 SOCKETS[ss] += bytes(payload["data"].values()) 143 144 145 def on_ssl_DefRead(payload): 146 if not "data" in payload: 147 return 148 149 ss = payload["ss"] 150 if not ss in SOCKETS: 151 SOCKETS[ss] = bytes() 152 153 SOCKETS[ss] += bytes(payload["data"].values()) 154 155 156 def script_on_message(message, _data): 157 if message["type"] != "send": 158 print(message) 159 return 160 161 assert message["type"] == "send" 162 163 payload = message["payload"] 164 func = "on_" + payload["func"] 165 166 assert func in globals() 167 globals()[func](payload) 168 169 170 def session_on_detached(): 171 with TERMINATE: 172 TERMINATE.notify_all() 173 174 175 def main(): 176 parser = argparse.ArgumentParser() 177 parser.add_argument("--script", required=True, type=str) 178 parser.add_argument("--nss-build", 179 required=True, 180 type=str, 181 help="e.g. /path/to/dist/Debug") 182 parser.add_argument("--program", required=True, type=str) 183 parser.add_argument("--output", required=True, type=str) 184 185 args, programargs = parser.parse_known_args() 186 187 global ARGUMENTS 188 ARGUMENTS = vars(args) 189 190 with open(args.script, "r") as f: 191 script = f.read() 192 193 pid = frida.spawn(program=args.program, 194 argv=programargs, 195 env={ 196 **os.environ, "LD_LIBRARY_PATH": 197 os.path.join(args.nss_build, "lib") 198 }) 199 session = frida.attach(pid) 200 201 script = session.create_script(script) 202 script.load() 203 204 script.on("message", script_on_message) 205 206 session.resume() 207 frida.resume(pid) 208 209 session.on("detached", session_on_detached) 210 211 with TERMINATE: 212 TERMINATE.wait() 213 214 215 if __name__ == "__main__": 216 main()