generate_builtins.py (6166B)
1 import argparse 2 import csv 3 import json 4 import struct 5 import subprocess 6 import sys 7 from datetime import datetime 8 from io import StringIO 9 10 import requests 11 from cryptography import x509 12 from cryptography.hazmat.backends import default_backend 13 from cryptography.hazmat.primitives import hashes, serialization 14 15 # Format: CCADB Record Creation Date, SHA-256 Fingerprint, Subject Key Identifier, Authority Key Identifier, Root or Intermediate Certificate Record, X.509 Certificate PEM 16 REPORT_URL = "https://ccadb.my.salesforce-sites.com/ccadb/WebTrustListAsOf?ListDate={}" 17 DATE_ADDITION_COL = "CCADB Record Creation Date" 18 CERT_PEM_COL = "X.509 Certificate PEM" 19 20 21 class IdentifierAllocator: 22 # Prefix should be a byte string.p 23 def __init__(self, prefix): 24 self.prefix = prefix 25 self.position = 0 26 27 def getIdentifier(self): 28 result = self.prefix + struct.pack(">H", self.position) 29 self.position += 1 30 return result 31 32 33 def get_webtrust_certs(list_date): 34 output = [] 35 url = REPORT_URL.format(list_date) 36 response = requests.get(url, timeout=10) 37 response.raise_for_status() 38 csv_data = response.text 39 with open(f"webtrust_certs-{list_date}.csv", "w") as certs_file: 40 certs_file.write(csv_data) 41 for r in csv.DictReader(StringIO(csv_data)): 42 timestamp = datetime.strptime(r[DATE_ADDITION_COL], "%Y-%m-%dT%H:%M:%SZ") 43 cert = x509.load_pem_x509_certificate( 44 r[CERT_PEM_COL].encode("ascii"), default_backend() 45 ) 46 certDer = cert.public_bytes(serialization.Encoding.DER) 47 output.append((timestamp, certDer)) 48 print(f"Loaded {len(output)} certs from {url}") 49 return output 50 51 52 def create_cert_dict(certs): 53 certs.sort(key=lambda x: x[0]) 54 output = dict() 55 idAlloc = IdentifierAllocator(b"\xff") 56 for _, der in certs: 57 idHex = idAlloc.getIdentifier() 58 output[idHex] = der 59 return output 60 61 62 def load_json_cache(jf): 63 with open(jf) as f: 64 j = json.load(f) 65 assert j["data"] 66 assert j["list_date"] 67 assert j["creation_date"] 68 j["data"] = {bytes.fromhex(k): bytes.fromhex(v) for k, v in j["data"].items()} 69 return j 70 71 72 def cert_to_hash_rust_array(cert): 73 digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) 74 digest.update(cert) 75 76 # Finalize the hash and get the digest 77 sha256_hash = digest.finalize() 78 79 # Format the hash as a Rust array 80 rust_array = ", ".join(f"0x{byte:02x}" for byte in sha256_hash) 81 rust_output = f"[{rust_array}]" 82 return rust_output 83 84 85 def make_rust_file_contents(certs, generation_date, list_date): 86 output = "" 87 output += f""" 88 /* This Source Code Form is subject to the terms of the Mozilla Public 89 * License, v. 2.0. If a copy of the MPL was not distributed with this 90 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ 91 92 use log; 93 use std::sync::OnceLock; 94 use thin_vec::ThinVec; 95 96 // Autogenerated via security/manager/ssl/abridged_certs/tools/generate_builtins.py 97 // Generation Date: {generation_date} 98 // Based on list version: {list_date} 99 """ 100 output += """ 101 102 // Public Interface 103 104 /// Given an Abridged Cert Identifier, lookup the hash of the corresponding certificate 105 pub fn id_to_hash(id: &[u8; 3]) -> Option<&ThinVec<u8>> { 106 let index: usize = u16::from_be_bytes([id[1], id[2]]).into(); 107 log::trace!("Parsed identifier {:#02X?} as index {}", id, index); 108 ABRIDGED_CERTS_BUILTINS_HASHES.get_or_init(init_hashes).get(index) 109 } 110 111 /// Get a list of hashes needed for this Abridged Certs scheme 112 pub fn get_needed_hashes() -> Option<&'static ThinVec<ThinVec<u8>>> { 113 Some(ABRIDGED_CERTS_BUILTINS_HASHES.get_or_init(init_hashes)) 114 } 115 116 // Private Implementation 117 118 /// This is currently built whenever it is accessed (similar a to lazy_static) 119 /// However, we may want explicit control in the future , e.g. if we want to delay 120 /// construction until after cert_storage has synced, or if we want to use a manifest 121 /// from remote settings. 122 static ABRIDGED_CERTS_BUILTINS_HASHES: OnceLock<ThinVec<ThinVec<u8>>> = OnceLock::new(); 123 124 /// rustc / LLVM has a number of outstanding bugs in its code generation for large 125 /// functions. See the discussion in Bug 1969383 for why this format was selected. 126 """ 127 output += f"const ABRIDGED_CERT_BYTES: [u8; {len(certs) * 32}] = [" 128 for id, cert in certs.items(): 129 output += cert_to_hash_rust_array(cert).strip("[]") + ", " 130 output += "];\n" 131 132 output += """pub fn init_hashes() -> ThinVec<ThinVec<u8>>{""" 133 output += f"""let mut m = ThinVec::with_capacity({len(certs)});""" 134 output += """ 135 for entry in ABRIDGED_CERT_BYTES.chunks(32) { 136 m.push(ThinVec::from(entry)); 137 } 138 m.shrink_to_fit(); 139 m 140 } 141 """ 142 143 return output 144 145 146 if __name__ == "__main__": 147 today = datetime.now().strftime("%Y-%m-%d") 148 parser = argparse.ArgumentParser( 149 description="Builds a map from identifiers to WebPKI Intermediate and Root Certificates" 150 ) 151 parser.add_argument( 152 "-d", 153 "--date", 154 help="Specify the date you want the list as-of (YYYY-MM-DD format)", 155 type=str, 156 default=today, 157 ) 158 parser.add_argument( 159 "-o", 160 "--output", 161 help="Specify the output file path", 162 type=str, 163 default="builtins.rs", 164 ) 165 parser.add_argument( 166 "-i", 167 "--input", 168 help="Specify a cached version of the list from JSON. Overrides date option.", 169 type=str, 170 default=None, 171 ) 172 args = parser.parse_args() 173 174 certs = None 175 if args.input: 176 j = load_json_cache(args.input) 177 certs = j["data"] 178 args.date = j["list_date"] 179 today = j["creation_date"] 180 else: 181 certs = get_webtrust_certs(args.date) 182 print(f"Fetched {len(certs)} certificates") 183 certs = create_cert_dict(certs) 184 185 with open(args.output, "w") as rust_file: 186 rust_file.write(make_rust_file_contents(certs, today, args.date)) 187 188 subprocess.run(["rustfmt", args.output], capture_output=True, text=True, check=True) 189 190 print(f"Generated file output to {args.output}") 191 sys.exit(0)