pycert.py (33616B)
1 #!/usr/bin/env python 2 # 3 # This Source Code Form is subject to the terms of the Mozilla Public 4 # License, v. 2.0. If a copy of the MPL was not distributed with this 5 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7 """ 8 Reads a certificate specification from stdin or a file and outputs a 9 signed x509 certificate with the desired properties. 10 11 The input format is as follows: 12 13 issuer:<issuer distinguished name specification> 14 subject:<subject distinguished name specification> 15 [version:{1,2,3,4}] 16 [validity:<YYYYMMDD-YYYYMMDD|duration in days>] 17 [issuerKey:<key specification>] 18 [subjectKey:<key specification>] 19 [signature:{sha256WithRSAEncryption,sha1WithRSAEncryption, 20 md5WithRSAEncryption,ecdsaWithSHA256,ecdsaWithSHA384, 21 ecdsaWithSHA512}] 22 [serialNumber:<integer in the interval [1, 127]>] 23 [extension:<extension name:<extension-specific data>>] 24 [...] 25 26 Known extensions are: 27 basicConstraints:[cA],[pathLenConstraint] 28 keyUsage:[digitalSignature,nonRepudiation,keyEncipherment, 29 dataEncipherment,keyAgreement,keyCertSign,cRLSign] 30 extKeyUsage:[serverAuth,clientAuth,codeSigning,emailProtection 31 nsSGC, # Netscape Server Gated Crypto 32 OCSPSigning,timeStamping,tlsBinding] 33 subjectAlternativeName:[<dNSName|directoryName|"ip4:"iPV4Address>,...] 34 authorityInformationAccess:<OCSP URI> 35 certificatePolicies:[<policy OID>,...] 36 nameConstraints:{permitted,excluded}:[<dNSName|directoryName>,...] 37 nsCertType:sslServer 38 TLSFeature:[<TLSFeature>,...] 39 embeddedSCTList:[<key specification>:<YYYYMMDD>[:<leaf index>],...] 40 delegationUsage: 41 qcStatements:[<statement OID[:info OID]>,...] 42 43 Where: 44 [] indicates an optional field or component of a field 45 <> indicates a required component of a field 46 {} indicates a choice of exactly one value among a set of values 47 [a,b,c] indicates a list of potential values, of which zero or more 48 may be used 49 50 For instance, the version field is optional. However, if it is 51 specified, it must have exactly one value from the set {1,2,3,4}. 52 53 Most fields have reasonable default values. By default one shared RSA 54 key is used for all signatures and subject public key information 55 fields. Using "issuerKey:<key specification>" or 56 "subjectKey:<key specification>" causes a different key be used for 57 signing or as the subject public key information field, respectively. 58 See pykey.py for the list of available specifications. 59 The signature algorithm is sha256WithRSAEncryption by default. 60 61 The validity period may be specified as either concrete notBefore and 62 notAfter values or as a validity period centered around 'now'. For the 63 latter, this will result in a notBefore of 'now' - duration/2 and a 64 notAfter of 'now' + duration/2. 65 66 Issuer and subject distinguished name specifications are of the form 67 '[stringEncoding]/C=XX/O=Example/CN=example.com'. C (country name), ST 68 (state or province name), L (locality name), O (organization name), OU 69 (organizational unit name), CN (common name) and emailAddress (email 70 address) are currently supported. The optional stringEncoding field may 71 be 'utf8String' or 'printableString'. If the given string does not 72 contain a '/', it is assumed to represent a common name. If an empty 73 string is provided, then an empty distinguished name is returned. 74 DirectoryNames also use this format. When specifying a directoryName in 75 a nameConstraints extension, the implicit form may not be used. 76 77 If an extension name has '[critical]' after it, it will be marked as 78 critical. Otherwise (by default), it will not be marked as critical. 79 80 TLSFeature values can either consist of a named value (currently only 81 'OCSPMustStaple' which corresponds to status_request) or a numeric TLS 82 feature value (see rfc7633 for more information). 83 84 If a serial number is not explicitly specified, it is automatically 85 generated based on the contents of the certificate. 86 """ 87 88 import base64 89 import datetime 90 import hashlib 91 import re 92 import socket 93 import sys 94 from struct import pack 95 96 import pyct 97 import pykey 98 from pyasn1.codec.der import decoder, encoder 99 from pyasn1.type import constraint, tag, univ, useful 100 from pyasn1_modules import rfc2459 101 102 103 class Error(Exception): 104 """Base class for exceptions in this module.""" 105 106 pass 107 108 109 class UnknownBaseError(Error): 110 """Base class for handling unexpected input in this module.""" 111 112 def __init__(self, value): 113 super().__init__() 114 self.value = value 115 self.category = "input" 116 117 def __str__(self): 118 return f'Unknown {self.category} type "{repr(self.value)}"' 119 120 121 class UnknownAlgorithmTypeError(UnknownBaseError): 122 """Helper exception type to handle unknown algorithm types.""" 123 124 def __init__(self, value): 125 UnknownBaseError.__init__(self, value) 126 self.category = "algorithm" 127 128 129 class UnknownParameterTypeError(UnknownBaseError): 130 """Helper exception type to handle unknown input parameters.""" 131 132 def __init__(self, value): 133 UnknownBaseError.__init__(self, value) 134 self.category = "parameter" 135 136 137 class UnknownExtensionTypeError(UnknownBaseError): 138 """Helper exception type to handle unknown input extensions.""" 139 140 def __init__(self, value): 141 UnknownBaseError.__init__(self, value) 142 self.category = "extension" 143 144 145 class UnknownKeyPurposeTypeError(UnknownBaseError): 146 """Helper exception type to handle unknown key purposes.""" 147 148 def __init__(self, value): 149 UnknownBaseError.__init__(self, value) 150 self.category = "keyPurpose" 151 152 153 class UnknownKeyTargetError(UnknownBaseError): 154 """Helper exception type to handle unknown key targets.""" 155 156 def __init__(self, value): 157 UnknownBaseError.__init__(self, value) 158 self.category = "key target" 159 160 161 class UnknownVersionError(UnknownBaseError): 162 """Helper exception type to handle unknown specified versions.""" 163 164 def __init__(self, value): 165 UnknownBaseError.__init__(self, value) 166 self.category = "version" 167 168 169 class UnknownNameConstraintsSpecificationError(UnknownBaseError): 170 """Helper exception type to handle unknown specified 171 nameConstraints.""" 172 173 def __init__(self, value): 174 UnknownBaseError.__init__(self, value) 175 self.category = "nameConstraints specification" 176 177 178 class UnknownDNTypeError(UnknownBaseError): 179 """Helper exception type to handle unknown DN types.""" 180 181 def __init__(self, value): 182 UnknownBaseError.__init__(self, value) 183 self.category = "DN" 184 185 186 class UnknownNSCertTypeError(UnknownBaseError): 187 """Helper exception type to handle unknown nsCertType types.""" 188 189 def __init__(self, value): 190 UnknownBaseError.__init__(self, value) 191 self.category = "nsCertType" 192 193 194 class UnknownTLSFeature(UnknownBaseError): 195 """Helper exception type to handle unknown TLS Features.""" 196 197 def __init__(self, value): 198 UnknownBaseError.__init__(self, value) 199 self.category = "TLSFeature" 200 201 202 class UnknownDelegatedCredentialError(UnknownBaseError): 203 """Helper exception type to handle unknown Delegated Credential args.""" 204 205 def __init__(self, value): 206 UnknownBaseError.__init__(self, value) 207 self.category = "delegatedCredential" 208 209 210 class InvalidSCTSpecification(Error): 211 """Helper exception type to handle invalid SCT specifications.""" 212 213 def __init__(self, value): 214 super().__init__() 215 self.value = value 216 217 def __str__(self): 218 return f'invalid SCT specification "{self.value}"' 219 220 221 class InvalidSerialNumber(Error): 222 """Exception type to handle invalid serial numbers.""" 223 224 def __init__(self, value): 225 super().__init__() 226 self.value = value 227 228 def __str__(self): 229 return repr(self.value) 230 231 232 def getASN1Tag(asn1Type): 233 """Helper function for returning the base tag value of a given 234 type from the pyasn1 package""" 235 return asn1Type.tagSet.baseTag.tagId 236 237 238 def stringToAccessDescription(string): 239 """Helper function that takes a string representing a URI 240 presumably identifying an OCSP authority information access 241 location. Returns an AccessDescription usable by pyasn1.""" 242 accessMethod = rfc2459.id_ad_ocsp 243 accessLocation = rfc2459.GeneralName() 244 accessLocation["uniformResourceIdentifier"] = string 245 sequence = univ.Sequence() 246 sequence.setComponentByPosition(0, accessMethod) 247 sequence.setComponentByPosition(1, accessLocation) 248 return sequence 249 250 251 def stringToDN(string, tag=None): 252 """Takes a string representing a distinguished name or directory 253 name and returns a Name for use by pyasn1. See the documentation 254 for the issuer and subject fields for more details. Takes an 255 optional implicit tag in cases where the Name needs to be tagged 256 differently.""" 257 if string and "/" not in string: 258 string = f"/CN={string}" 259 rdns = rfc2459.RDNSequence() 260 pattern = "/(C|ST|L|O|OU|CN|emailAddress)=" 261 split = re.split(pattern, string) 262 # split should now be [[encoding], <type>, <value>, <type>, <value>, ...] 263 if split[0]: 264 encoding = split[0] 265 else: 266 encoding = "utf8String" 267 for pos, (nameType, value) in enumerate(zip(split[1::2], split[2::2])): 268 ava = rfc2459.AttributeTypeAndValue() 269 if nameType == "C": 270 ava["type"] = rfc2459.id_at_countryName 271 nameComponent = rfc2459.X520countryName(value) 272 elif nameType == "ST": 273 ava["type"] = rfc2459.id_at_stateOrProvinceName 274 nameComponent = rfc2459.X520StateOrProvinceName() 275 elif nameType == "L": 276 ava["type"] = rfc2459.id_at_localityName 277 nameComponent = rfc2459.X520LocalityName() 278 elif nameType == "O": 279 ava["type"] = rfc2459.id_at_organizationName 280 nameComponent = rfc2459.X520OrganizationName() 281 elif nameType == "OU": 282 ava["type"] = rfc2459.id_at_organizationalUnitName 283 nameComponent = rfc2459.X520OrganizationalUnitName() 284 elif nameType == "CN": 285 ava["type"] = rfc2459.id_at_commonName 286 nameComponent = rfc2459.X520CommonName() 287 elif nameType == "emailAddress": 288 ava["type"] = rfc2459.emailAddress 289 nameComponent = rfc2459.Pkcs9email(value) 290 else: 291 raise UnknownDNTypeError(nameType) 292 if not nameType == "C" and not nameType == "emailAddress": 293 # The value may have things like '\0' (i.e. a slash followed by 294 # the number zero) that have to be decoded into the resulting 295 # '\x00' (i.e. a byte with value zero). 296 nameComponent[encoding] = value.encode().decode(encoding="unicode_escape") 297 ava["value"] = nameComponent 298 rdn = rfc2459.RelativeDistinguishedName() 299 rdn.setComponentByPosition(0, ava) 300 rdns.setComponentByPosition(pos, rdn) 301 if tag: 302 name = rfc2459.Name().subtype(implicitTag=tag) 303 else: 304 name = rfc2459.Name() 305 name.setComponentByPosition(0, rdns) 306 return name 307 308 309 def stringToAlgorithmIdentifiers(string): 310 """Helper function that converts a description of an algorithm 311 to a representation usable by the pyasn1 package and a hash 312 algorithm constant for use by pykey.""" 313 algorithmIdentifier = rfc2459.AlgorithmIdentifier() 314 algorithmType = None 315 algorithm = None 316 # We add Null parameters for RSA only 317 addParameters = False 318 if string == "sha1WithRSAEncryption": 319 algorithmType = pykey.HASH_SHA1 320 algorithm = rfc2459.sha1WithRSAEncryption 321 addParameters = True 322 elif string == "sha256WithRSAEncryption": 323 algorithmType = pykey.HASH_SHA256 324 algorithm = univ.ObjectIdentifier("1.2.840.113549.1.1.11") 325 addParameters = True 326 elif string == "md5WithRSAEncryption": 327 algorithmType = pykey.HASH_MD5 328 algorithm = rfc2459.md5WithRSAEncryption 329 addParameters = True 330 elif string == "ecdsaWithSHA256": 331 algorithmType = pykey.HASH_SHA256 332 algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.2") 333 elif string == "ecdsaWithSHA384": 334 algorithmType = pykey.HASH_SHA384 335 algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.3") 336 elif string == "ecdsaWithSHA512": 337 algorithmType = pykey.HASH_SHA512 338 algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.4") 339 else: 340 raise UnknownAlgorithmTypeError(string) 341 algorithmIdentifier["algorithm"] = algorithm 342 if addParameters: 343 # Directly setting parameters to univ.Null doesn't currently work. 344 nullEncapsulated = encoder.encode(univ.Null()) 345 algorithmIdentifier["parameters"] = univ.Any(nullEncapsulated) 346 return (algorithmIdentifier, algorithmType) 347 348 349 def datetimeToTime(dt): 350 """Takes a datetime object and returns an rfc2459.Time object with 351 that time as its value as a GeneralizedTime""" 352 time = rfc2459.Time() 353 time["generalTime"] = useful.GeneralizedTime(dt.strftime("%Y%m%d%H%M%SZ")) 354 return time 355 356 357 def serialBytesToString(serialBytes): 358 """Takes a list of integers in the interval [0, 255] and returns 359 the corresponding serial number string.""" 360 serialBytesLen = len(serialBytes) 361 if serialBytesLen > 127: 362 raise InvalidSerialNumber(f"{serialBytesLen} bytes is too long") 363 # Prepend the ASN.1 INTEGER tag and length bytes. 364 stringBytes = [getASN1Tag(univ.Integer), serialBytesLen] + serialBytes 365 return bytes(stringBytes) 366 367 368 class Certificate: 369 """Utility class for reading a certificate specification and 370 generating a signed x509 certificate""" 371 372 def __init__(self, paramStream): 373 self.versionValue = 2 # a value of 2 is X509v3 374 self.signature = "sha256WithRSAEncryption" 375 self.issuer = "Default Issuer" 376 actualNow = datetime.datetime.utcnow() 377 self.now = datetime.datetime.strptime(str(actualNow.year), "%Y") 378 aYearAndAWhile = datetime.timedelta(days=400) 379 self.notBefore = self.now - aYearAndAWhile 380 self.notAfter = self.now + aYearAndAWhile 381 self.subject = "Default Subject" 382 self.extensions = None 383 # The serial number can be automatically generated from the 384 # certificate specification. We need this value to depend in 385 # part of what extensions are present. self.extensions are 386 # pyasn1 objects. Depending on the string representation of 387 # these objects can cause the resulting serial number to change 388 # unexpectedly, so instead we depend on the original string 389 # representation of the extensions as specified. 390 self.extensionLines = None 391 self.savedEmbeddedSCTListData = None 392 self.subjectKey = pykey.keyFromSpecification("default") 393 self.issuerKey = pykey.keyFromSpecification("default") 394 self.serialNumber = None 395 self.decodeParams(paramStream) 396 # If a serial number wasn't specified, generate one based on 397 # the certificate contents. 398 if not self.serialNumber: 399 self.serialNumber = self.generateSerialNumber() 400 # This has to be last because the SCT signature depends on the 401 # contents of the certificate. 402 if self.savedEmbeddedSCTListData: 403 self.addEmbeddedSCTListData() 404 405 def generateSerialNumber(self): 406 """Generates a serial number for this certificate based on its 407 contents. Intended to be reproducible for compatibility with 408 the build system on OS X (see the comment above main, later in 409 this file).""" 410 hasher = hashlib.sha256() 411 hasher.update(str(self.versionValue).encode()) 412 hasher.update(self.signature.encode()) 413 hasher.update(self.issuer.encode()) 414 hasher.update(str(self.notBefore).encode()) 415 hasher.update(str(self.notAfter).encode()) 416 hasher.update(self.subject.encode()) 417 if self.extensionLines: 418 for extensionLine in self.extensionLines: 419 hasher.update(extensionLine.encode()) 420 if self.savedEmbeddedSCTListData: 421 # savedEmbeddedSCTListData is 422 # (embeddedSCTListSpecification, critical), where |critical| 423 # may be None 424 hasher.update(self.savedEmbeddedSCTListData[0].encode()) 425 if self.savedEmbeddedSCTListData[1]: 426 hasher.update(self.savedEmbeddedSCTListData[1].encode()) 427 serialBytes = [c for c in hasher.digest()[:20]] 428 # Ensure that the most significant bit isn't set (which would 429 # indicate a negative number, which isn't valid for serial 430 # numbers). 431 serialBytes[0] &= 0x7F 432 # Also ensure that the least significant bit on the most 433 # significant byte is set (to prevent a leading zero byte, 434 # which also wouldn't be valid). 435 serialBytes[0] |= 0x01 436 return serialBytesToString(serialBytes) 437 438 def decodeParams(self, paramStream): 439 for line in paramStream.readlines(): 440 self.decodeParam(line.strip()) 441 442 def decodeParam(self, line): 443 param = line.split(":")[0] 444 value = ":".join(line.split(":")[1:]) 445 if param == "version": 446 self.setVersion(value) 447 elif param == "subject": 448 self.subject = value 449 elif param == "issuer": 450 self.issuer = value 451 elif param == "validity": 452 self.decodeValidity(value) 453 elif param == "extension": 454 self.decodeExtension(value) 455 elif param == "issuerKey": 456 self.setupKey("issuer", value) 457 elif param == "subjectKey": 458 self.setupKey("subject", value) 459 elif param == "signature": 460 self.signature = value 461 elif param == "serialNumber": 462 serialNumber = int(value) 463 # Ensure only serial numbers that conform to the rules listed in 464 # generateSerialNumber() are permitted. 465 if serialNumber < 1 or serialNumber > 127: 466 raise InvalidSerialNumber(value) 467 self.serialNumber = serialBytesToString([serialNumber]) 468 else: 469 raise UnknownParameterTypeError(param) 470 471 def setVersion(self, version): 472 intVersion = int(version) 473 if intVersion >= 1 and intVersion <= 4: 474 self.versionValue = intVersion - 1 475 else: 476 raise UnknownVersionError(version) 477 478 def decodeValidity(self, duration): 479 match = re.search("([0-9]{8})-([0-9]{8})", duration) 480 if match: 481 self.notBefore = datetime.datetime.strptime(match.group(1), "%Y%m%d") 482 self.notAfter = datetime.datetime.strptime(match.group(2), "%Y%m%d") 483 else: 484 delta = datetime.timedelta(days=(int(duration) / 2)) 485 self.notBefore = self.now - delta 486 self.notAfter = self.now + delta 487 488 def decodeExtension(self, extension): 489 match = re.search(r"([a-zA-Z]+)(\[critical\])?:(.*)", extension) 490 if not match: 491 raise UnknownExtensionTypeError(extension) 492 extensionType = match.group(1) 493 critical = match.group(2) 494 value = match.group(3) 495 if extensionType == "basicConstraints": 496 self.addBasicConstraints(value, critical) 497 elif extensionType == "keyUsage": 498 self.addKeyUsage(value, critical) 499 elif extensionType == "extKeyUsage": 500 self.addExtKeyUsage(value, critical) 501 elif extensionType == "subjectAlternativeName": 502 self.addSubjectAlternativeName(value, critical) 503 elif extensionType == "authorityInformationAccess": 504 self.addAuthorityInformationAccess(value, critical) 505 elif extensionType == "certificatePolicies": 506 self.addCertificatePolicies(value, critical) 507 elif extensionType == "nameConstraints": 508 self.addNameConstraints(value, critical) 509 elif extensionType == "nsCertType": 510 self.addNSCertType(value, critical) 511 elif extensionType == "TLSFeature": 512 self.addTLSFeature(value, critical) 513 elif extensionType == "embeddedSCTList": 514 self.savedEmbeddedSCTListData = (value, critical) 515 elif extensionType == "delegationUsage": 516 self.addDelegationUsage(critical) 517 elif extensionType == "qcStatements": 518 self.addQCStatements(value, critical) 519 else: 520 raise UnknownExtensionTypeError(extensionType) 521 522 if extensionType != "embeddedSCTList": 523 if not self.extensionLines: 524 self.extensionLines = [] 525 self.extensionLines.append(extension) 526 527 def setupKey(self, subjectOrIssuer, value): 528 if subjectOrIssuer == "subject": 529 self.subjectKey = pykey.keyFromSpecification(value) 530 elif subjectOrIssuer == "issuer": 531 self.issuerKey = pykey.keyFromSpecification(value) 532 else: 533 raise UnknownKeyTargetError(subjectOrIssuer) 534 535 def addExtension(self, extensionType, extensionValue, critical): 536 if not self.extensions: 537 self.extensions = [] 538 encapsulated = univ.OctetString(encoder.encode(extensionValue)) 539 extension = rfc2459.Extension() 540 extension["extnID"] = extensionType 541 # critical is either the string '[critical]' or None. 542 # We only care whether or not it is truthy. 543 if critical: 544 extension["critical"] = True 545 extension["extnValue"] = encapsulated 546 self.extensions.append(extension) 547 548 def addBasicConstraints(self, basicConstraints, critical): 549 cA = basicConstraints.split(",")[0] 550 pathLenConstraint = basicConstraints.split(",")[1] 551 basicConstraintsExtension = rfc2459.BasicConstraints() 552 basicConstraintsExtension["cA"] = cA == "cA" 553 if pathLenConstraint: 554 pathLenConstraintValue = univ.Integer(int(pathLenConstraint)).subtype( 555 subtypeSpec=constraint.ValueRangeConstraint(0, float("inf")) 556 ) 557 basicConstraintsExtension["pathLenConstraint"] = pathLenConstraintValue 558 self.addExtension( 559 rfc2459.id_ce_basicConstraints, basicConstraintsExtension, critical 560 ) 561 562 def addKeyUsage(self, keyUsage, critical): 563 keyUsageExtension = rfc2459.KeyUsage(keyUsage) 564 self.addExtension(rfc2459.id_ce_keyUsage, keyUsageExtension, critical) 565 566 def keyPurposeToOID(self, keyPurpose): 567 if keyPurpose == "serverAuth": 568 return rfc2459.id_kp_serverAuth 569 if keyPurpose == "clientAuth": 570 return rfc2459.id_kp_clientAuth 571 if keyPurpose == "codeSigning": 572 return rfc2459.id_kp_codeSigning 573 if keyPurpose == "emailProtection": 574 return rfc2459.id_kp_emailProtection 575 if keyPurpose == "nsSGC": 576 return univ.ObjectIdentifier("2.16.840.1.113730.4.1") 577 if keyPurpose == "OCSPSigning": 578 return univ.ObjectIdentifier("1.3.6.1.5.5.7.3.9") 579 if keyPurpose == "timeStamping": 580 return rfc2459.id_kp_timeStamping 581 if keyPurpose == "tlsBinding": 582 return univ.ObjectIdentifier("0.4.0.194115.1.0") 583 raise UnknownKeyPurposeTypeError(keyPurpose) 584 585 def addExtKeyUsage(self, extKeyUsage, critical): 586 extKeyUsageExtension = rfc2459.ExtKeyUsageSyntax() 587 for count, keyPurpose in enumerate(extKeyUsage.split(",")): 588 extKeyUsageExtension.setComponentByPosition( 589 count, self.keyPurposeToOID(keyPurpose) 590 ) 591 self.addExtension(rfc2459.id_ce_extKeyUsage, extKeyUsageExtension, critical) 592 593 def addSubjectAlternativeName(self, names, critical): 594 IPV4_PREFIX = "ip4:" 595 596 subjectAlternativeName = rfc2459.SubjectAltName() 597 for count, name in enumerate(names.split(",")): 598 generalName = rfc2459.GeneralName() 599 if "/" in name: 600 directoryName = stringToDN( 601 name, tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4) 602 ) 603 generalName["directoryName"] = directoryName 604 elif "@" in name: 605 generalName["rfc822Name"] = name 606 elif name.startswith(IPV4_PREFIX): 607 generalName["iPAddress"] = socket.inet_pton( 608 socket.AF_INET, name[len(IPV4_PREFIX) :] 609 ) 610 else: 611 # The string may have things like '\0' (i.e. a slash 612 # followed by the number zero) that have to be decoded into 613 # the resulting '\x00' (i.e. a byte with value zero). 614 generalName["dNSName"] = name.encode().decode("unicode_escape") 615 subjectAlternativeName.setComponentByPosition(count, generalName) 616 self.addExtension( 617 rfc2459.id_ce_subjectAltName, subjectAlternativeName, critical 618 ) 619 620 def addAuthorityInformationAccess(self, ocspURI, critical): 621 sequence = univ.Sequence() 622 accessDescription = stringToAccessDescription(ocspURI) 623 sequence.setComponentByPosition(0, accessDescription) 624 self.addExtension(rfc2459.id_pe_authorityInfoAccess, sequence, critical) 625 626 def addCertificatePolicies(self, policyOIDs, critical): 627 policies = rfc2459.CertificatePolicies() 628 for pos, policyOID in enumerate(policyOIDs.split(",")): 629 policyOIDMapped = policyOID 630 if policyOIDMapped == "any": 631 policyOIDMapped = "2.5.29.32.0" 632 policy = rfc2459.PolicyInformation() 633 policyIdentifier = rfc2459.CertPolicyId(policyOIDMapped) 634 policy["policyIdentifier"] = policyIdentifier 635 policies.setComponentByPosition(pos, policy) 636 self.addExtension(rfc2459.id_ce_certificatePolicies, policies, critical) 637 638 def addNameConstraints(self, constraints, critical): 639 nameConstraints = rfc2459.NameConstraints() 640 if constraints.startswith("permitted:"): 641 (subtreesType, subtreesTag) = ("permittedSubtrees", 0) 642 elif constraints.startswith("excluded:"): 643 (subtreesType, subtreesTag) = ("excludedSubtrees", 1) 644 else: 645 raise UnknownNameConstraintsSpecificationError(constraints) 646 generalSubtrees = rfc2459.GeneralSubtrees().subtype( 647 implicitTag=tag.Tag( 648 tag.tagClassContext, tag.tagFormatConstructed, subtreesTag 649 ) 650 ) 651 subtrees = constraints[(constraints.find(":") + 1) :] 652 for pos, name in enumerate(subtrees.split(",")): 653 generalName = rfc2459.GeneralName() 654 if "/" in name: 655 directoryName = stringToDN( 656 name, tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4) 657 ) 658 generalName["directoryName"] = directoryName 659 else: 660 generalName["dNSName"] = name 661 generalSubtree = rfc2459.GeneralSubtree() 662 generalSubtree["base"] = generalName 663 generalSubtrees.setComponentByPosition(pos, generalSubtree) 664 nameConstraints[subtreesType] = generalSubtrees 665 self.addExtension(rfc2459.id_ce_nameConstraints, nameConstraints, critical) 666 667 def addNSCertType(self, certType, critical): 668 if certType != "sslServer": 669 raise UnknownNSCertTypeError(certType) 670 self.addExtension( 671 univ.ObjectIdentifier("2.16.840.1.113730.1.1"), 672 univ.BitString("'01'B"), 673 critical, 674 ) 675 676 def addDelegationUsage(self, critical): 677 if critical: 678 raise UnknownDelegatedCredentialError(critical) 679 self.addExtension( 680 univ.ObjectIdentifier("1.3.6.1.4.1.44363.44"), univ.Null(), critical 681 ) 682 683 def addTLSFeature(self, features, critical): 684 namedFeatures = {"OCSPMustStaple": 5} 685 featureList = [f.strip() for f in features.split(",")] 686 sequence = univ.Sequence() 687 for pos, feature in enumerate(featureList): 688 featureValue = 0 689 try: 690 featureValue = int(feature) 691 except ValueError: 692 try: 693 featureValue = namedFeatures[feature] 694 except Exception: 695 raise UnknownTLSFeature(feature) 696 sequence.setComponentByPosition(pos, univ.Integer(featureValue)) 697 self.addExtension( 698 univ.ObjectIdentifier("1.3.6.1.5.5.7.1.24"), sequence, critical 699 ) 700 701 def addEmbeddedSCTListData(self): 702 (scts, critical) = self.savedEmbeddedSCTListData 703 encodedSCTs = [] 704 for sctSpec in scts.split(","): 705 match = re.search(r"(\w+):(\d{8}):?(\d+)?", sctSpec) 706 if not match: 707 raise InvalidSCTSpecification(sctSpec) 708 keySpec = match.group(1) 709 leafIndex = match.group(3) 710 if leafIndex: 711 leafIndex = int(leafIndex) 712 key = pykey.keyFromSpecification(keySpec) 713 time = datetime.datetime.strptime(match.group(2), "%Y%m%d") 714 tbsCertificate = self.getTBSCertificate() 715 tbsDER = encoder.encode(tbsCertificate) 716 sct = pyct.SCT( 717 key, time, pyct.PrecertEntry(tbsDER, self.issuerKey), leafIndex 718 ) 719 signed = sct.signAndEncode() 720 lengthPrefix = pack("!H", len(signed)) 721 encodedSCTs.append(lengthPrefix + signed) 722 encodedSCTBytes = b"".join(encodedSCTs) 723 lengthPrefix = pack("!H", len(encodedSCTBytes)) 724 extensionBytes = lengthPrefix + encodedSCTBytes 725 self.addExtension( 726 univ.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"), 727 univ.OctetString(extensionBytes), 728 critical, 729 ) 730 731 def addQCStatements(self, qcStatements, critical): 732 sequence = univ.Sequence() 733 for pos, qcStatement in enumerate(qcStatements.split(",")): 734 parts = qcStatement.split(":") 735 statementID = parts[0] 736 statementInfo = None 737 if len(parts) > 1: 738 statementInfo = parts[1] 739 qcStatementSequence = univ.Sequence() 740 qcStatementSequence.setComponentByPosition( 741 0, univ.ObjectIdentifier(statementID) 742 ) 743 if statementInfo: 744 statementInfoSequence = univ.Sequence() 745 statementInfoSequence.setComponentByPosition( 746 0, univ.ObjectIdentifier(statementInfo) 747 ) 748 qcStatementSequence.setComponentByPosition(1, statementInfoSequence) 749 sequence.setComponentByPosition(pos, qcStatementSequence) 750 self.addExtension( 751 univ.ObjectIdentifier("1.3.6.1.5.5.7.1.3"), sequence, critical 752 ) 753 754 def getVersion(self): 755 return rfc2459.Version(self.versionValue).subtype( 756 explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0) 757 ) 758 759 def getSerialNumber(self): 760 return decoder.decode(self.serialNumber)[0] 761 762 def getIssuer(self): 763 return stringToDN(self.issuer) 764 765 def getValidity(self): 766 validity = rfc2459.Validity() 767 validity["notBefore"] = self.getNotBefore() 768 validity["notAfter"] = self.getNotAfter() 769 return validity 770 771 def getNotBefore(self): 772 return datetimeToTime(self.notBefore) 773 774 def getNotAfter(self): 775 return datetimeToTime(self.notAfter) 776 777 def getSubject(self): 778 return stringToDN(self.subject) 779 780 def getTBSCertificate(self): 781 (signatureOID, _) = stringToAlgorithmIdentifiers(self.signature) 782 tbsCertificate = rfc2459.TBSCertificate() 783 tbsCertificate["version"] = self.getVersion() 784 tbsCertificate["serialNumber"] = self.getSerialNumber() 785 tbsCertificate["signature"] = signatureOID 786 tbsCertificate["issuer"] = self.getIssuer() 787 tbsCertificate["validity"] = self.getValidity() 788 tbsCertificate["subject"] = self.getSubject() 789 tbsCertificate["subjectPublicKeyInfo"] = ( 790 self.subjectKey.asSubjectPublicKeyInfo() 791 ) 792 if self.extensions: 793 extensions = rfc2459.Extensions().subtype( 794 explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3) 795 ) 796 for count, extension in enumerate(self.extensions): 797 extensions.setComponentByPosition(count, extension) 798 tbsCertificate["extensions"] = extensions 799 return tbsCertificate 800 801 def toDER(self): 802 (signatureOID, hashAlgorithm) = stringToAlgorithmIdentifiers(self.signature) 803 certificate = rfc2459.Certificate() 804 tbsCertificate = self.getTBSCertificate() 805 certificate["tbsCertificate"] = tbsCertificate 806 certificate["signatureAlgorithm"] = signatureOID 807 tbsDER = encoder.encode(tbsCertificate) 808 certificate["signatureValue"] = self.issuerKey.sign(tbsDER, hashAlgorithm) 809 return encoder.encode(certificate) 810 811 def toPEM(self): 812 output = "-----BEGIN CERTIFICATE-----" 813 der = self.toDER() 814 b64 = base64.b64encode(der).decode() 815 while b64: 816 output += "\n" + b64[:64] 817 b64 = b64[64:] 818 output += "\n-----END CERTIFICATE-----" 819 return output 820 821 822 # The build harness will call this function with an output 823 # file-like object and a path to a file containing a 824 # specification. This will read the specification and output 825 # the certificate as PEM. 826 def main(output, inputPath): 827 with open(inputPath) as configStream: 828 output.write(Certificate(configStream).toPEM() + "\n") 829 830 831 # When run as a standalone program, this will read a specification from 832 # stdin and output the certificate as PEM to stdout. 833 if __name__ == "__main__": 834 print(Certificate(sys.stdin).toPEM())