tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

pycert.py (33616B)


      1 #!/usr/bin/env python
      2 #
      3 # This Source Code Form is subject to the terms of the Mozilla Public
      4 # License, v. 2.0. If a copy of the MPL was not distributed with this
      5 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      6 
      7 """
      8 Reads a certificate specification from stdin or a file and outputs a
      9 signed x509 certificate with the desired properties.
     10 
     11 The input format is as follows:
     12 
     13 issuer:<issuer distinguished name specification>
     14 subject:<subject distinguished name specification>
     15 [version:{1,2,3,4}]
     16 [validity:<YYYYMMDD-YYYYMMDD|duration in days>]
     17 [issuerKey:<key specification>]
     18 [subjectKey:<key specification>]
     19 [signature:{sha256WithRSAEncryption,sha1WithRSAEncryption,
     20            md5WithRSAEncryption,ecdsaWithSHA256,ecdsaWithSHA384,
     21            ecdsaWithSHA512}]
     22 [serialNumber:<integer in the interval [1, 127]>]
     23 [extension:<extension name:<extension-specific data>>]
     24 [...]
     25 
     26 Known extensions are:
     27 basicConstraints:[cA],[pathLenConstraint]
     28 keyUsage:[digitalSignature,nonRepudiation,keyEncipherment,
     29          dataEncipherment,keyAgreement,keyCertSign,cRLSign]
     30 extKeyUsage:[serverAuth,clientAuth,codeSigning,emailProtection
     31             nsSGC, # Netscape Server Gated Crypto
     32             OCSPSigning,timeStamping,tlsBinding]
     33 subjectAlternativeName:[<dNSName|directoryName|"ip4:"iPV4Address>,...]
     34 authorityInformationAccess:<OCSP URI>
     35 certificatePolicies:[<policy OID>,...]
     36 nameConstraints:{permitted,excluded}:[<dNSName|directoryName>,...]
     37 nsCertType:sslServer
     38 TLSFeature:[<TLSFeature>,...]
     39 embeddedSCTList:[<key specification>:<YYYYMMDD>[:<leaf index>],...]
     40 delegationUsage:
     41 qcStatements:[<statement OID[:info OID]>,...]
     42 
     43 Where:
     44  [] indicates an optional field or component of a field
     45  <> indicates a required component of a field
     46  {} indicates a choice of exactly one value among a set of values
     47  [a,b,c] indicates a list of potential values, of which zero or more
     48          may be used
     49 
     50 For instance, the version field is optional. However, if it is
     51 specified, it must have exactly one value from the set {1,2,3,4}.
     52 
     53 Most fields have reasonable default values. By default one shared RSA
     54 key is used for all signatures and subject public key information
     55 fields. Using "issuerKey:<key specification>" or
     56 "subjectKey:<key specification>" causes a different key be used for
     57 signing or as the subject public key information field, respectively.
     58 See pykey.py for the list of available specifications.
     59 The signature algorithm is sha256WithRSAEncryption by default.
     60 
     61 The validity period may be specified as either concrete notBefore and
     62 notAfter values or as a validity period centered around 'now'. For the
     63 latter, this will result in a notBefore of 'now' - duration/2 and a
     64 notAfter of 'now' + duration/2.
     65 
     66 Issuer and subject distinguished name specifications are of the form
     67 '[stringEncoding]/C=XX/O=Example/CN=example.com'. C (country name), ST
     68 (state or province name), L (locality name), O (organization name), OU
     69 (organizational unit name), CN (common name) and emailAddress (email
     70 address) are currently supported. The optional stringEncoding field may
     71 be 'utf8String' or 'printableString'. If the given string does not
     72 contain a '/', it is assumed to represent a common name. If an empty
     73 string is provided, then an empty distinguished name is returned.
     74 DirectoryNames also use this format. When specifying a directoryName in
     75 a nameConstraints extension, the implicit form may not be used.
     76 
     77 If an extension name has '[critical]' after it, it will be marked as
     78 critical. Otherwise (by default), it will not be marked as critical.
     79 
     80 TLSFeature values can either consist of a named value (currently only
     81 'OCSPMustStaple' which corresponds to status_request) or a numeric TLS
     82 feature value (see rfc7633 for more information).
     83 
     84 If a serial number is not explicitly specified, it is automatically
     85 generated based on the contents of the certificate.
     86 """
     87 
     88 import base64
     89 import datetime
     90 import hashlib
     91 import re
     92 import socket
     93 import sys
     94 from struct import pack
     95 
     96 import pyct
     97 import pykey
     98 from pyasn1.codec.der import decoder, encoder
     99 from pyasn1.type import constraint, tag, univ, useful
    100 from pyasn1_modules import rfc2459
    101 
    102 
    103 class Error(Exception):
    104    """Base class for exceptions in this module."""
    105 
    106    pass
    107 
    108 
    109 class UnknownBaseError(Error):
    110    """Base class for handling unexpected input in this module."""
    111 
    112    def __init__(self, value):
    113        super().__init__()
    114        self.value = value
    115        self.category = "input"
    116 
    117    def __str__(self):
    118        return f'Unknown {self.category} type "{repr(self.value)}"'
    119 
    120 
    121 class UnknownAlgorithmTypeError(UnknownBaseError):
    122    """Helper exception type to handle unknown algorithm types."""
    123 
    124    def __init__(self, value):
    125        UnknownBaseError.__init__(self, value)
    126        self.category = "algorithm"
    127 
    128 
    129 class UnknownParameterTypeError(UnknownBaseError):
    130    """Helper exception type to handle unknown input parameters."""
    131 
    132    def __init__(self, value):
    133        UnknownBaseError.__init__(self, value)
    134        self.category = "parameter"
    135 
    136 
    137 class UnknownExtensionTypeError(UnknownBaseError):
    138    """Helper exception type to handle unknown input extensions."""
    139 
    140    def __init__(self, value):
    141        UnknownBaseError.__init__(self, value)
    142        self.category = "extension"
    143 
    144 
    145 class UnknownKeyPurposeTypeError(UnknownBaseError):
    146    """Helper exception type to handle unknown key purposes."""
    147 
    148    def __init__(self, value):
    149        UnknownBaseError.__init__(self, value)
    150        self.category = "keyPurpose"
    151 
    152 
    153 class UnknownKeyTargetError(UnknownBaseError):
    154    """Helper exception type to handle unknown key targets."""
    155 
    156    def __init__(self, value):
    157        UnknownBaseError.__init__(self, value)
    158        self.category = "key target"
    159 
    160 
    161 class UnknownVersionError(UnknownBaseError):
    162    """Helper exception type to handle unknown specified versions."""
    163 
    164    def __init__(self, value):
    165        UnknownBaseError.__init__(self, value)
    166        self.category = "version"
    167 
    168 
    169 class UnknownNameConstraintsSpecificationError(UnknownBaseError):
    170    """Helper exception type to handle unknown specified
    171    nameConstraints."""
    172 
    173    def __init__(self, value):
    174        UnknownBaseError.__init__(self, value)
    175        self.category = "nameConstraints specification"
    176 
    177 
    178 class UnknownDNTypeError(UnknownBaseError):
    179    """Helper exception type to handle unknown DN types."""
    180 
    181    def __init__(self, value):
    182        UnknownBaseError.__init__(self, value)
    183        self.category = "DN"
    184 
    185 
    186 class UnknownNSCertTypeError(UnknownBaseError):
    187    """Helper exception type to handle unknown nsCertType types."""
    188 
    189    def __init__(self, value):
    190        UnknownBaseError.__init__(self, value)
    191        self.category = "nsCertType"
    192 
    193 
    194 class UnknownTLSFeature(UnknownBaseError):
    195    """Helper exception type to handle unknown TLS Features."""
    196 
    197    def __init__(self, value):
    198        UnknownBaseError.__init__(self, value)
    199        self.category = "TLSFeature"
    200 
    201 
    202 class UnknownDelegatedCredentialError(UnknownBaseError):
    203    """Helper exception type to handle unknown Delegated Credential args."""
    204 
    205    def __init__(self, value):
    206        UnknownBaseError.__init__(self, value)
    207        self.category = "delegatedCredential"
    208 
    209 
    210 class InvalidSCTSpecification(Error):
    211    """Helper exception type to handle invalid SCT specifications."""
    212 
    213    def __init__(self, value):
    214        super().__init__()
    215        self.value = value
    216 
    217    def __str__(self):
    218        return f'invalid SCT specification "{self.value}"'
    219 
    220 
    221 class InvalidSerialNumber(Error):
    222    """Exception type to handle invalid serial numbers."""
    223 
    224    def __init__(self, value):
    225        super().__init__()
    226        self.value = value
    227 
    228    def __str__(self):
    229        return repr(self.value)
    230 
    231 
    232 def getASN1Tag(asn1Type):
    233    """Helper function for returning the base tag value of a given
    234    type from the pyasn1 package"""
    235    return asn1Type.tagSet.baseTag.tagId
    236 
    237 
    238 def stringToAccessDescription(string):
    239    """Helper function that takes a string representing a URI
    240    presumably identifying an OCSP authority information access
    241    location. Returns an AccessDescription usable by pyasn1."""
    242    accessMethod = rfc2459.id_ad_ocsp
    243    accessLocation = rfc2459.GeneralName()
    244    accessLocation["uniformResourceIdentifier"] = string
    245    sequence = univ.Sequence()
    246    sequence.setComponentByPosition(0, accessMethod)
    247    sequence.setComponentByPosition(1, accessLocation)
    248    return sequence
    249 
    250 
    251 def stringToDN(string, tag=None):
    252    """Takes a string representing a distinguished name or directory
    253    name and returns a Name for use by pyasn1. See the documentation
    254    for the issuer and subject fields for more details. Takes an
    255    optional implicit tag in cases where the Name needs to be tagged
    256    differently."""
    257    if string and "/" not in string:
    258        string = f"/CN={string}"
    259    rdns = rfc2459.RDNSequence()
    260    pattern = "/(C|ST|L|O|OU|CN|emailAddress)="
    261    split = re.split(pattern, string)
    262    # split should now be [[encoding], <type>, <value>, <type>, <value>, ...]
    263    if split[0]:
    264        encoding = split[0]
    265    else:
    266        encoding = "utf8String"
    267    for pos, (nameType, value) in enumerate(zip(split[1::2], split[2::2])):
    268        ava = rfc2459.AttributeTypeAndValue()
    269        if nameType == "C":
    270            ava["type"] = rfc2459.id_at_countryName
    271            nameComponent = rfc2459.X520countryName(value)
    272        elif nameType == "ST":
    273            ava["type"] = rfc2459.id_at_stateOrProvinceName
    274            nameComponent = rfc2459.X520StateOrProvinceName()
    275        elif nameType == "L":
    276            ava["type"] = rfc2459.id_at_localityName
    277            nameComponent = rfc2459.X520LocalityName()
    278        elif nameType == "O":
    279            ava["type"] = rfc2459.id_at_organizationName
    280            nameComponent = rfc2459.X520OrganizationName()
    281        elif nameType == "OU":
    282            ava["type"] = rfc2459.id_at_organizationalUnitName
    283            nameComponent = rfc2459.X520OrganizationalUnitName()
    284        elif nameType == "CN":
    285            ava["type"] = rfc2459.id_at_commonName
    286            nameComponent = rfc2459.X520CommonName()
    287        elif nameType == "emailAddress":
    288            ava["type"] = rfc2459.emailAddress
    289            nameComponent = rfc2459.Pkcs9email(value)
    290        else:
    291            raise UnknownDNTypeError(nameType)
    292        if not nameType == "C" and not nameType == "emailAddress":
    293            # The value may have things like '\0' (i.e. a slash followed by
    294            # the number zero) that have to be decoded into the resulting
    295            # '\x00' (i.e. a byte with value zero).
    296            nameComponent[encoding] = value.encode().decode(encoding="unicode_escape")
    297        ava["value"] = nameComponent
    298        rdn = rfc2459.RelativeDistinguishedName()
    299        rdn.setComponentByPosition(0, ava)
    300        rdns.setComponentByPosition(pos, rdn)
    301    if tag:
    302        name = rfc2459.Name().subtype(implicitTag=tag)
    303    else:
    304        name = rfc2459.Name()
    305    name.setComponentByPosition(0, rdns)
    306    return name
    307 
    308 
    309 def stringToAlgorithmIdentifiers(string):
    310    """Helper function that converts a description of an algorithm
    311    to a representation usable by the pyasn1 package and a hash
    312    algorithm constant for use by pykey."""
    313    algorithmIdentifier = rfc2459.AlgorithmIdentifier()
    314    algorithmType = None
    315    algorithm = None
    316    # We add Null parameters for RSA only
    317    addParameters = False
    318    if string == "sha1WithRSAEncryption":
    319        algorithmType = pykey.HASH_SHA1
    320        algorithm = rfc2459.sha1WithRSAEncryption
    321        addParameters = True
    322    elif string == "sha256WithRSAEncryption":
    323        algorithmType = pykey.HASH_SHA256
    324        algorithm = univ.ObjectIdentifier("1.2.840.113549.1.1.11")
    325        addParameters = True
    326    elif string == "md5WithRSAEncryption":
    327        algorithmType = pykey.HASH_MD5
    328        algorithm = rfc2459.md5WithRSAEncryption
    329        addParameters = True
    330    elif string == "ecdsaWithSHA256":
    331        algorithmType = pykey.HASH_SHA256
    332        algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.2")
    333    elif string == "ecdsaWithSHA384":
    334        algorithmType = pykey.HASH_SHA384
    335        algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.3")
    336    elif string == "ecdsaWithSHA512":
    337        algorithmType = pykey.HASH_SHA512
    338        algorithm = univ.ObjectIdentifier("1.2.840.10045.4.3.4")
    339    else:
    340        raise UnknownAlgorithmTypeError(string)
    341    algorithmIdentifier["algorithm"] = algorithm
    342    if addParameters:
    343        # Directly setting parameters to univ.Null doesn't currently work.
    344        nullEncapsulated = encoder.encode(univ.Null())
    345        algorithmIdentifier["parameters"] = univ.Any(nullEncapsulated)
    346    return (algorithmIdentifier, algorithmType)
    347 
    348 
    349 def datetimeToTime(dt):
    350    """Takes a datetime object and returns an rfc2459.Time object with
    351    that time as its value as a GeneralizedTime"""
    352    time = rfc2459.Time()
    353    time["generalTime"] = useful.GeneralizedTime(dt.strftime("%Y%m%d%H%M%SZ"))
    354    return time
    355 
    356 
    357 def serialBytesToString(serialBytes):
    358    """Takes a list of integers in the interval [0, 255] and returns
    359    the corresponding serial number string."""
    360    serialBytesLen = len(serialBytes)
    361    if serialBytesLen > 127:
    362        raise InvalidSerialNumber(f"{serialBytesLen} bytes is too long")
    363    # Prepend the ASN.1 INTEGER tag and length bytes.
    364    stringBytes = [getASN1Tag(univ.Integer), serialBytesLen] + serialBytes
    365    return bytes(stringBytes)
    366 
    367 
    368 class Certificate:
    369    """Utility class for reading a certificate specification and
    370    generating a signed x509 certificate"""
    371 
    372    def __init__(self, paramStream):
    373        self.versionValue = 2  # a value of 2 is X509v3
    374        self.signature = "sha256WithRSAEncryption"
    375        self.issuer = "Default Issuer"
    376        actualNow = datetime.datetime.utcnow()
    377        self.now = datetime.datetime.strptime(str(actualNow.year), "%Y")
    378        aYearAndAWhile = datetime.timedelta(days=400)
    379        self.notBefore = self.now - aYearAndAWhile
    380        self.notAfter = self.now + aYearAndAWhile
    381        self.subject = "Default Subject"
    382        self.extensions = None
    383        # The serial number can be automatically generated from the
    384        # certificate specification. We need this value to depend in
    385        # part of what extensions are present. self.extensions are
    386        # pyasn1 objects. Depending on the string representation of
    387        # these objects can cause the resulting serial number to change
    388        # unexpectedly, so instead we depend on the original string
    389        # representation of the extensions as specified.
    390        self.extensionLines = None
    391        self.savedEmbeddedSCTListData = None
    392        self.subjectKey = pykey.keyFromSpecification("default")
    393        self.issuerKey = pykey.keyFromSpecification("default")
    394        self.serialNumber = None
    395        self.decodeParams(paramStream)
    396        # If a serial number wasn't specified, generate one based on
    397        # the certificate contents.
    398        if not self.serialNumber:
    399            self.serialNumber = self.generateSerialNumber()
    400        # This has to be last because the SCT signature depends on the
    401        # contents of the certificate.
    402        if self.savedEmbeddedSCTListData:
    403            self.addEmbeddedSCTListData()
    404 
    405    def generateSerialNumber(self):
    406        """Generates a serial number for this certificate based on its
    407        contents. Intended to be reproducible for compatibility with
    408        the build system on OS X (see the comment above main, later in
    409        this file)."""
    410        hasher = hashlib.sha256()
    411        hasher.update(str(self.versionValue).encode())
    412        hasher.update(self.signature.encode())
    413        hasher.update(self.issuer.encode())
    414        hasher.update(str(self.notBefore).encode())
    415        hasher.update(str(self.notAfter).encode())
    416        hasher.update(self.subject.encode())
    417        if self.extensionLines:
    418            for extensionLine in self.extensionLines:
    419                hasher.update(extensionLine.encode())
    420        if self.savedEmbeddedSCTListData:
    421            # savedEmbeddedSCTListData is
    422            # (embeddedSCTListSpecification, critical), where |critical|
    423            # may be None
    424            hasher.update(self.savedEmbeddedSCTListData[0].encode())
    425            if self.savedEmbeddedSCTListData[1]:
    426                hasher.update(self.savedEmbeddedSCTListData[1].encode())
    427        serialBytes = [c for c in hasher.digest()[:20]]
    428        # Ensure that the most significant bit isn't set (which would
    429        # indicate a negative number, which isn't valid for serial
    430        # numbers).
    431        serialBytes[0] &= 0x7F
    432        # Also ensure that the least significant bit on the most
    433        # significant byte is set (to prevent a leading zero byte,
    434        # which also wouldn't be valid).
    435        serialBytes[0] |= 0x01
    436        return serialBytesToString(serialBytes)
    437 
    438    def decodeParams(self, paramStream):
    439        for line in paramStream.readlines():
    440            self.decodeParam(line.strip())
    441 
    442    def decodeParam(self, line):
    443        param = line.split(":")[0]
    444        value = ":".join(line.split(":")[1:])
    445        if param == "version":
    446            self.setVersion(value)
    447        elif param == "subject":
    448            self.subject = value
    449        elif param == "issuer":
    450            self.issuer = value
    451        elif param == "validity":
    452            self.decodeValidity(value)
    453        elif param == "extension":
    454            self.decodeExtension(value)
    455        elif param == "issuerKey":
    456            self.setupKey("issuer", value)
    457        elif param == "subjectKey":
    458            self.setupKey("subject", value)
    459        elif param == "signature":
    460            self.signature = value
    461        elif param == "serialNumber":
    462            serialNumber = int(value)
    463            # Ensure only serial numbers that conform to the rules listed in
    464            # generateSerialNumber() are permitted.
    465            if serialNumber < 1 or serialNumber > 127:
    466                raise InvalidSerialNumber(value)
    467            self.serialNumber = serialBytesToString([serialNumber])
    468        else:
    469            raise UnknownParameterTypeError(param)
    470 
    471    def setVersion(self, version):
    472        intVersion = int(version)
    473        if intVersion >= 1 and intVersion <= 4:
    474            self.versionValue = intVersion - 1
    475        else:
    476            raise UnknownVersionError(version)
    477 
    478    def decodeValidity(self, duration):
    479        match = re.search("([0-9]{8})-([0-9]{8})", duration)
    480        if match:
    481            self.notBefore = datetime.datetime.strptime(match.group(1), "%Y%m%d")
    482            self.notAfter = datetime.datetime.strptime(match.group(2), "%Y%m%d")
    483        else:
    484            delta = datetime.timedelta(days=(int(duration) / 2))
    485            self.notBefore = self.now - delta
    486            self.notAfter = self.now + delta
    487 
    488    def decodeExtension(self, extension):
    489        match = re.search(r"([a-zA-Z]+)(\[critical\])?:(.*)", extension)
    490        if not match:
    491            raise UnknownExtensionTypeError(extension)
    492        extensionType = match.group(1)
    493        critical = match.group(2)
    494        value = match.group(3)
    495        if extensionType == "basicConstraints":
    496            self.addBasicConstraints(value, critical)
    497        elif extensionType == "keyUsage":
    498            self.addKeyUsage(value, critical)
    499        elif extensionType == "extKeyUsage":
    500            self.addExtKeyUsage(value, critical)
    501        elif extensionType == "subjectAlternativeName":
    502            self.addSubjectAlternativeName(value, critical)
    503        elif extensionType == "authorityInformationAccess":
    504            self.addAuthorityInformationAccess(value, critical)
    505        elif extensionType == "certificatePolicies":
    506            self.addCertificatePolicies(value, critical)
    507        elif extensionType == "nameConstraints":
    508            self.addNameConstraints(value, critical)
    509        elif extensionType == "nsCertType":
    510            self.addNSCertType(value, critical)
    511        elif extensionType == "TLSFeature":
    512            self.addTLSFeature(value, critical)
    513        elif extensionType == "embeddedSCTList":
    514            self.savedEmbeddedSCTListData = (value, critical)
    515        elif extensionType == "delegationUsage":
    516            self.addDelegationUsage(critical)
    517        elif extensionType == "qcStatements":
    518            self.addQCStatements(value, critical)
    519        else:
    520            raise UnknownExtensionTypeError(extensionType)
    521 
    522        if extensionType != "embeddedSCTList":
    523            if not self.extensionLines:
    524                self.extensionLines = []
    525            self.extensionLines.append(extension)
    526 
    527    def setupKey(self, subjectOrIssuer, value):
    528        if subjectOrIssuer == "subject":
    529            self.subjectKey = pykey.keyFromSpecification(value)
    530        elif subjectOrIssuer == "issuer":
    531            self.issuerKey = pykey.keyFromSpecification(value)
    532        else:
    533            raise UnknownKeyTargetError(subjectOrIssuer)
    534 
    535    def addExtension(self, extensionType, extensionValue, critical):
    536        if not self.extensions:
    537            self.extensions = []
    538        encapsulated = univ.OctetString(encoder.encode(extensionValue))
    539        extension = rfc2459.Extension()
    540        extension["extnID"] = extensionType
    541        # critical is either the string '[critical]' or None.
    542        # We only care whether or not it is truthy.
    543        if critical:
    544            extension["critical"] = True
    545        extension["extnValue"] = encapsulated
    546        self.extensions.append(extension)
    547 
    548    def addBasicConstraints(self, basicConstraints, critical):
    549        cA = basicConstraints.split(",")[0]
    550        pathLenConstraint = basicConstraints.split(",")[1]
    551        basicConstraintsExtension = rfc2459.BasicConstraints()
    552        basicConstraintsExtension["cA"] = cA == "cA"
    553        if pathLenConstraint:
    554            pathLenConstraintValue = univ.Integer(int(pathLenConstraint)).subtype(
    555                subtypeSpec=constraint.ValueRangeConstraint(0, float("inf"))
    556            )
    557            basicConstraintsExtension["pathLenConstraint"] = pathLenConstraintValue
    558        self.addExtension(
    559            rfc2459.id_ce_basicConstraints, basicConstraintsExtension, critical
    560        )
    561 
    562    def addKeyUsage(self, keyUsage, critical):
    563        keyUsageExtension = rfc2459.KeyUsage(keyUsage)
    564        self.addExtension(rfc2459.id_ce_keyUsage, keyUsageExtension, critical)
    565 
    566    def keyPurposeToOID(self, keyPurpose):
    567        if keyPurpose == "serverAuth":
    568            return rfc2459.id_kp_serverAuth
    569        if keyPurpose == "clientAuth":
    570            return rfc2459.id_kp_clientAuth
    571        if keyPurpose == "codeSigning":
    572            return rfc2459.id_kp_codeSigning
    573        if keyPurpose == "emailProtection":
    574            return rfc2459.id_kp_emailProtection
    575        if keyPurpose == "nsSGC":
    576            return univ.ObjectIdentifier("2.16.840.1.113730.4.1")
    577        if keyPurpose == "OCSPSigning":
    578            return univ.ObjectIdentifier("1.3.6.1.5.5.7.3.9")
    579        if keyPurpose == "timeStamping":
    580            return rfc2459.id_kp_timeStamping
    581        if keyPurpose == "tlsBinding":
    582            return univ.ObjectIdentifier("0.4.0.194115.1.0")
    583        raise UnknownKeyPurposeTypeError(keyPurpose)
    584 
    585    def addExtKeyUsage(self, extKeyUsage, critical):
    586        extKeyUsageExtension = rfc2459.ExtKeyUsageSyntax()
    587        for count, keyPurpose in enumerate(extKeyUsage.split(",")):
    588            extKeyUsageExtension.setComponentByPosition(
    589                count, self.keyPurposeToOID(keyPurpose)
    590            )
    591        self.addExtension(rfc2459.id_ce_extKeyUsage, extKeyUsageExtension, critical)
    592 
    593    def addSubjectAlternativeName(self, names, critical):
    594        IPV4_PREFIX = "ip4:"
    595 
    596        subjectAlternativeName = rfc2459.SubjectAltName()
    597        for count, name in enumerate(names.split(",")):
    598            generalName = rfc2459.GeneralName()
    599            if "/" in name:
    600                directoryName = stringToDN(
    601                    name, tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)
    602                )
    603                generalName["directoryName"] = directoryName
    604            elif "@" in name:
    605                generalName["rfc822Name"] = name
    606            elif name.startswith(IPV4_PREFIX):
    607                generalName["iPAddress"] = socket.inet_pton(
    608                    socket.AF_INET, name[len(IPV4_PREFIX) :]
    609                )
    610            else:
    611                # The string may have things like '\0' (i.e. a slash
    612                # followed by the number zero) that have to be decoded into
    613                # the resulting '\x00' (i.e. a byte with value zero).
    614                generalName["dNSName"] = name.encode().decode("unicode_escape")
    615            subjectAlternativeName.setComponentByPosition(count, generalName)
    616        self.addExtension(
    617            rfc2459.id_ce_subjectAltName, subjectAlternativeName, critical
    618        )
    619 
    620    def addAuthorityInformationAccess(self, ocspURI, critical):
    621        sequence = univ.Sequence()
    622        accessDescription = stringToAccessDescription(ocspURI)
    623        sequence.setComponentByPosition(0, accessDescription)
    624        self.addExtension(rfc2459.id_pe_authorityInfoAccess, sequence, critical)
    625 
    626    def addCertificatePolicies(self, policyOIDs, critical):
    627        policies = rfc2459.CertificatePolicies()
    628        for pos, policyOID in enumerate(policyOIDs.split(",")):
    629            policyOIDMapped = policyOID
    630            if policyOIDMapped == "any":
    631                policyOIDMapped = "2.5.29.32.0"
    632            policy = rfc2459.PolicyInformation()
    633            policyIdentifier = rfc2459.CertPolicyId(policyOIDMapped)
    634            policy["policyIdentifier"] = policyIdentifier
    635            policies.setComponentByPosition(pos, policy)
    636        self.addExtension(rfc2459.id_ce_certificatePolicies, policies, critical)
    637 
    638    def addNameConstraints(self, constraints, critical):
    639        nameConstraints = rfc2459.NameConstraints()
    640        if constraints.startswith("permitted:"):
    641            (subtreesType, subtreesTag) = ("permittedSubtrees", 0)
    642        elif constraints.startswith("excluded:"):
    643            (subtreesType, subtreesTag) = ("excludedSubtrees", 1)
    644        else:
    645            raise UnknownNameConstraintsSpecificationError(constraints)
    646        generalSubtrees = rfc2459.GeneralSubtrees().subtype(
    647            implicitTag=tag.Tag(
    648                tag.tagClassContext, tag.tagFormatConstructed, subtreesTag
    649            )
    650        )
    651        subtrees = constraints[(constraints.find(":") + 1) :]
    652        for pos, name in enumerate(subtrees.split(",")):
    653            generalName = rfc2459.GeneralName()
    654            if "/" in name:
    655                directoryName = stringToDN(
    656                    name, tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)
    657                )
    658                generalName["directoryName"] = directoryName
    659            else:
    660                generalName["dNSName"] = name
    661            generalSubtree = rfc2459.GeneralSubtree()
    662            generalSubtree["base"] = generalName
    663            generalSubtrees.setComponentByPosition(pos, generalSubtree)
    664        nameConstraints[subtreesType] = generalSubtrees
    665        self.addExtension(rfc2459.id_ce_nameConstraints, nameConstraints, critical)
    666 
    667    def addNSCertType(self, certType, critical):
    668        if certType != "sslServer":
    669            raise UnknownNSCertTypeError(certType)
    670        self.addExtension(
    671            univ.ObjectIdentifier("2.16.840.1.113730.1.1"),
    672            univ.BitString("'01'B"),
    673            critical,
    674        )
    675 
    676    def addDelegationUsage(self, critical):
    677        if critical:
    678            raise UnknownDelegatedCredentialError(critical)
    679        self.addExtension(
    680            univ.ObjectIdentifier("1.3.6.1.4.1.44363.44"), univ.Null(), critical
    681        )
    682 
    683    def addTLSFeature(self, features, critical):
    684        namedFeatures = {"OCSPMustStaple": 5}
    685        featureList = [f.strip() for f in features.split(",")]
    686        sequence = univ.Sequence()
    687        for pos, feature in enumerate(featureList):
    688            featureValue = 0
    689            try:
    690                featureValue = int(feature)
    691            except ValueError:
    692                try:
    693                    featureValue = namedFeatures[feature]
    694                except Exception:
    695                    raise UnknownTLSFeature(feature)
    696            sequence.setComponentByPosition(pos, univ.Integer(featureValue))
    697        self.addExtension(
    698            univ.ObjectIdentifier("1.3.6.1.5.5.7.1.24"), sequence, critical
    699        )
    700 
    701    def addEmbeddedSCTListData(self):
    702        (scts, critical) = self.savedEmbeddedSCTListData
    703        encodedSCTs = []
    704        for sctSpec in scts.split(","):
    705            match = re.search(r"(\w+):(\d{8}):?(\d+)?", sctSpec)
    706            if not match:
    707                raise InvalidSCTSpecification(sctSpec)
    708            keySpec = match.group(1)
    709            leafIndex = match.group(3)
    710            if leafIndex:
    711                leafIndex = int(leafIndex)
    712            key = pykey.keyFromSpecification(keySpec)
    713            time = datetime.datetime.strptime(match.group(2), "%Y%m%d")
    714            tbsCertificate = self.getTBSCertificate()
    715            tbsDER = encoder.encode(tbsCertificate)
    716            sct = pyct.SCT(
    717                key, time, pyct.PrecertEntry(tbsDER, self.issuerKey), leafIndex
    718            )
    719            signed = sct.signAndEncode()
    720            lengthPrefix = pack("!H", len(signed))
    721            encodedSCTs.append(lengthPrefix + signed)
    722        encodedSCTBytes = b"".join(encodedSCTs)
    723        lengthPrefix = pack("!H", len(encodedSCTBytes))
    724        extensionBytes = lengthPrefix + encodedSCTBytes
    725        self.addExtension(
    726            univ.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"),
    727            univ.OctetString(extensionBytes),
    728            critical,
    729        )
    730 
    731    def addQCStatements(self, qcStatements, critical):
    732        sequence = univ.Sequence()
    733        for pos, qcStatement in enumerate(qcStatements.split(",")):
    734            parts = qcStatement.split(":")
    735            statementID = parts[0]
    736            statementInfo = None
    737            if len(parts) > 1:
    738                statementInfo = parts[1]
    739            qcStatementSequence = univ.Sequence()
    740            qcStatementSequence.setComponentByPosition(
    741                0, univ.ObjectIdentifier(statementID)
    742            )
    743            if statementInfo:
    744                statementInfoSequence = univ.Sequence()
    745                statementInfoSequence.setComponentByPosition(
    746                    0, univ.ObjectIdentifier(statementInfo)
    747                )
    748                qcStatementSequence.setComponentByPosition(1, statementInfoSequence)
    749            sequence.setComponentByPosition(pos, qcStatementSequence)
    750        self.addExtension(
    751            univ.ObjectIdentifier("1.3.6.1.5.5.7.1.3"), sequence, critical
    752        )
    753 
    754    def getVersion(self):
    755        return rfc2459.Version(self.versionValue).subtype(
    756            explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)
    757        )
    758 
    759    def getSerialNumber(self):
    760        return decoder.decode(self.serialNumber)[0]
    761 
    762    def getIssuer(self):
    763        return stringToDN(self.issuer)
    764 
    765    def getValidity(self):
    766        validity = rfc2459.Validity()
    767        validity["notBefore"] = self.getNotBefore()
    768        validity["notAfter"] = self.getNotAfter()
    769        return validity
    770 
    771    def getNotBefore(self):
    772        return datetimeToTime(self.notBefore)
    773 
    774    def getNotAfter(self):
    775        return datetimeToTime(self.notAfter)
    776 
    777    def getSubject(self):
    778        return stringToDN(self.subject)
    779 
    780    def getTBSCertificate(self):
    781        (signatureOID, _) = stringToAlgorithmIdentifiers(self.signature)
    782        tbsCertificate = rfc2459.TBSCertificate()
    783        tbsCertificate["version"] = self.getVersion()
    784        tbsCertificate["serialNumber"] = self.getSerialNumber()
    785        tbsCertificate["signature"] = signatureOID
    786        tbsCertificate["issuer"] = self.getIssuer()
    787        tbsCertificate["validity"] = self.getValidity()
    788        tbsCertificate["subject"] = self.getSubject()
    789        tbsCertificate["subjectPublicKeyInfo"] = (
    790            self.subjectKey.asSubjectPublicKeyInfo()
    791        )
    792        if self.extensions:
    793            extensions = rfc2459.Extensions().subtype(
    794                explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)
    795            )
    796            for count, extension in enumerate(self.extensions):
    797                extensions.setComponentByPosition(count, extension)
    798            tbsCertificate["extensions"] = extensions
    799        return tbsCertificate
    800 
    801    def toDER(self):
    802        (signatureOID, hashAlgorithm) = stringToAlgorithmIdentifiers(self.signature)
    803        certificate = rfc2459.Certificate()
    804        tbsCertificate = self.getTBSCertificate()
    805        certificate["tbsCertificate"] = tbsCertificate
    806        certificate["signatureAlgorithm"] = signatureOID
    807        tbsDER = encoder.encode(tbsCertificate)
    808        certificate["signatureValue"] = self.issuerKey.sign(tbsDER, hashAlgorithm)
    809        return encoder.encode(certificate)
    810 
    811    def toPEM(self):
    812        output = "-----BEGIN CERTIFICATE-----"
    813        der = self.toDER()
    814        b64 = base64.b64encode(der).decode()
    815        while b64:
    816            output += "\n" + b64[:64]
    817            b64 = b64[64:]
    818        output += "\n-----END CERTIFICATE-----"
    819        return output
    820 
    821 
    822 # The build harness will call this function with an output
    823 # file-like object and a path to a file containing a
    824 # specification. This will read the specification and output
    825 # the certificate as PEM.
    826 def main(output, inputPath):
    827    with open(inputPath) as configStream:
    828        output.write(Certificate(configStream).toPEM() + "\n")
    829 
    830 
    831 # When run as a standalone program, this will read a specification from
    832 # stdin and output the certificate as PEM to stdout.
    833 if __name__ == "__main__":
    834    print(Certificate(sys.stdin).toPEM())