tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

pycms.py (11510B)


      1 #!/usr/bin/env python
      2 #
      3 # This Source Code Form is subject to the terms of the Mozilla Public
      4 # License, v. 2.0. If a copy of the MPL was not distributed with this
      5 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      6 
      7 """
      8 Reads a specification from stdin and outputs a PKCS7 (CMS) message with
      9 the desired properties.
     10 
     11 The specification format is as follows:
     12 
     13 sha1:<hex string>
     14 sha256:<hex string>
     15 md5:<hex string>
     16 tamperDigest:sha1 - Only sha1 is supported
     17 erase:{certificate, signerInfo}
     18 signer:
     19 <pycert specification>
     20 
     21 Eith or both of sha1 and sha256 may be specified. The value of
     22 each hash directive is what will be put in the messageDigest
     23 attribute of the SignerInfo that corresponds to the signature
     24 algorithm defined by the hash algorithm and key type of the
     25 default key. Together, these comprise the signerInfos field of
     26 the SignedData. If neither hash is specified, the signerInfos
     27 will be an empty SET (i.e. there will be no actual signature
     28 information).
     29 The certificate specification must come last.
     30 The script provides a possibility to tamper the hash while generating
     31 SignedAttributes, such that the SignedAttributes signature will be incorrect
     32 Erase allows specifying which PKCS7 field to strip (supports certificate or signerInfo)
     33 """
     34 
     35 import base64
     36 import sys
     37 from enum import Enum
     38 from io import StringIO
     39 
     40 import pycert
     41 import pykey
     42 from pyasn1.codec.der import decoder, encoder
     43 from pyasn1.type import tag, univ
     44 from pyasn1_modules import rfc2315, rfc2459
     45 
     46 
     47 class Error(Exception):
     48    """Base class for exceptions in this module."""
     49 
     50    pass
     51 
     52 
     53 class UnknownDirectiveError(Error):
     54    """Helper exception type to handle unknown specification
     55    directives."""
     56 
     57    def __init__(self, directive):
     58        super().__init__()
     59        self.directive = directive
     60 
     61    def __str__(self):
     62        return "Unknown directive %s" % repr(self.directive)
     63 
     64 
     65 class FieldStrip(Enum):
     66    CERTIFICATE = "certificate"
     67    SIGNER_INFO = "signerInfo"
     68 
     69 
     70 class HashToTamper(Enum):
     71    SHA1 = "sha1"
     72 
     73 
     74 class CMS:
     75    """Utility class for reading a CMS specification and
     76    generating a CMS message"""
     77 
     78    def __init__(self, paramStream):
     79        self.sha1 = ""
     80        self.sha256 = ""
     81        self.md5 = ""
     82 
     83        self.fieldStrip = ""
     84        self.tamperDigest = ""
     85 
     86        signerSpecification = StringIO()
     87        readingSignerSpecification = False
     88        for line in paramStream.readlines():
     89            if readingSignerSpecification:
     90                print(line.strip(), file=signerSpecification)
     91            elif line.strip() == "signer:":
     92                readingSignerSpecification = True
     93            elif line.startswith("sha1:"):
     94                self.sha1 = line.strip()[len("sha1:") :]
     95            elif line.startswith("sha256:"):
     96                self.sha256 = line.strip()[len("sha256:") :]
     97            elif line.startswith("md5:"):
     98                self.md5 = line.strip()[len("md5:") :]
     99            elif line.startswith("erase:"):
    100                if line.strip()[len("erase:") :] == FieldStrip.CERTIFICATE.value:
    101                    self.fieldStrip = FieldStrip.CERTIFICATE
    102                elif line.strip()[len("erase:") :] == FieldStrip.SIGNER_INFO.value:
    103                    self.fieldStrip = FieldStrip.SIGNER_INFO
    104                else:
    105                    raise UnknownDirectiveError(line.strip())
    106            elif line.startswith("tamperDigest"):
    107                if line.strip()[len("tamperDigest:") :] == HashToTamper.SHA1.value:
    108                    self.tamperDigest = HashToTamper.SHA1
    109                else:
    110                    raise UnknownDirectiveError(line.strip())
    111            else:
    112                raise UnknownDirectiveError(line.strip())
    113        signerSpecification.seek(0)
    114        self.signer = pycert.Certificate(signerSpecification)
    115        self.signingKey = pykey.keyFromSpecification("default")
    116 
    117    def buildAuthenticatedAttributes(self, value, implicitTag=None):
    118        """Utility function to build a pyasn1 AuthenticatedAttributes
    119        object. Useful because when building a SignerInfo, the
    120        authenticatedAttributes needs to be tagged implicitly, but when
    121        signing an AuthenticatedAttributes, it needs the explicit SET
    122        tag."""
    123        if implicitTag:
    124            authenticatedAttributes = rfc2315.Attributes().subtype(
    125                implicitTag=implicitTag
    126            )
    127        else:
    128            authenticatedAttributes = rfc2315.Attributes()
    129        contentTypeAttribute = rfc2315.Attribute()
    130        # PKCS#9 contentType
    131        contentTypeAttribute["type"] = univ.ObjectIdentifier("1.2.840.113549.1.9.3")
    132        contentTypeAttribute["values"] = univ.SetOf(rfc2459.AttributeValue())
    133        # PKCS#7 data
    134        contentTypeAttribute["values"][0] = univ.ObjectIdentifier(
    135            "1.2.840.113549.1.7.1"
    136        )
    137        authenticatedAttributes[0] = contentTypeAttribute
    138        hashAttribute = rfc2315.Attribute()
    139        # PKCS#9 messageDigest
    140        hashAttribute["type"] = univ.ObjectIdentifier("1.2.840.113549.1.9.4")
    141        hashAttribute["values"] = univ.SetOf(rfc2459.AttributeValue())
    142        hashAttribute["values"][0] = univ.OctetString(hexValue=value)
    143        authenticatedAttributes[1] = hashAttribute
    144        return authenticatedAttributes
    145 
    146    def pykeyHashToDigestAlgorithm(self, pykeyHash):
    147        """Given a pykey hash algorithm identifier, builds an
    148        AlgorithmIdentifier for use with pyasn1."""
    149        if pykeyHash == pykey.HASH_SHA1:
    150            oidString = "1.3.14.3.2.26"
    151        elif pykeyHash == pykey.HASH_SHA256:
    152            oidString = "2.16.840.1.101.3.4.2.1"
    153        elif pykeyHash == pykey.HASH_MD5:
    154            oidString = "1.2.840.113549.2.5"
    155        else:
    156            raise pykey.UnknownHashAlgorithmError(pykeyHash)
    157        algorithmIdentifier = rfc2459.AlgorithmIdentifier()
    158        algorithmIdentifier["algorithm"] = univ.ObjectIdentifier(oidString)
    159        # Directly setting parameters to univ.Null doesn't currently work.
    160        nullEncapsulated = encoder.encode(univ.Null())
    161        algorithmIdentifier["parameters"] = univ.Any(nullEncapsulated)
    162        return algorithmIdentifier
    163 
    164    def buildSignerInfo(self, certificate, pykeyHash, digestValue):
    165        """Given a pyasn1 certificate, a pykey hash identifier
    166        and a hash value, creates a SignerInfo with the
    167        appropriate values."""
    168        signerInfo = rfc2315.SignerInfo()
    169        signerInfo["version"] = 1
    170        issuerAndSerialNumber = rfc2315.IssuerAndSerialNumber()
    171        issuerAndSerialNumber["issuer"] = self.signer.getIssuer()
    172        issuerAndSerialNumber["serialNumber"] = certificate["tbsCertificate"][
    173            "serialNumber"
    174        ]
    175        signerInfo["issuerAndSerialNumber"] = issuerAndSerialNumber
    176        signerInfo["digestAlgorithm"] = self.pykeyHashToDigestAlgorithm(pykeyHash)
    177        rsa = rfc2459.AlgorithmIdentifier()
    178        rsa["algorithm"] = rfc2459.rsaEncryption
    179        rsa["parameters"] = univ.Null()
    180        authenticatedAttributes = self.buildAuthenticatedAttributes(
    181            digestValue,
    182            implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0),
    183        )
    184        authenticatedAttributesTBS = self.buildAuthenticatedAttributes(digestValue)
    185        signerInfo["authenticatedAttributes"] = authenticatedAttributes
    186        signerInfo["digestEncryptionAlgorithm"] = rsa
    187        authenticatedAttributesEncoded = encoder.encode(authenticatedAttributesTBS)
    188        signature = self.signingKey.sign(authenticatedAttributesEncoded, pykeyHash)
    189        if self.tamperDigest == HashToTamper.SHA1 and pykeyHash == pykey.HASH_SHA1:
    190            digestValue = hex((int(digestValue[0], 16) + 1) % 16)[2:] + digestValue[1:]
    191            authenticatedAttributesTBSTamperedHash = self.buildAuthenticatedAttributes(
    192                digestValue
    193            )
    194            authenticatedAttributesTamperedEncoded = encoder.encode(
    195                authenticatedAttributesTBSTamperedHash
    196            )
    197            # The signerInfo has an attribute with the initial hash
    198            # But the tampered hash attributes are signed
    199            signature = self.signingKey.sign(
    200                authenticatedAttributesTamperedEncoded, pykeyHash
    201            )
    202        # signature will be a hexified bit string of the form
    203        # "'<hex bytes>'H". For some reason that's what BitString wants,
    204        # but since this is an OCTET STRING, we have to strip off the
    205        # quotation marks and trailing "H".
    206        signerInfo["encryptedDigest"] = univ.OctetString(hexValue=signature[1:-2])
    207        return signerInfo
    208 
    209    def toDER(self):
    210        contentInfo = rfc2315.ContentInfo()
    211        contentInfo["contentType"] = rfc2315.signedData
    212 
    213        signedData = rfc2315.SignedData()
    214        signedData["version"] = rfc2315.Version(1)
    215 
    216        digestAlgorithms = rfc2315.DigestAlgorithmIdentifiers()
    217        digestAlgorithms[0] = self.pykeyHashToDigestAlgorithm(pykey.HASH_SHA1)
    218        signedData["digestAlgorithms"] = digestAlgorithms
    219 
    220        dataContentInfo = rfc2315.ContentInfo()
    221        dataContentInfo["contentType"] = rfc2315.data
    222        signedData["contentInfo"] = dataContentInfo
    223 
    224        certificates = rfc2315.ExtendedCertificatesAndCertificates().subtype(
    225            implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
    226        )
    227        extendedCertificateOrCertificate = rfc2315.ExtendedCertificateOrCertificate()
    228        certificate = decoder.decode(
    229            self.signer.toDER(), asn1Spec=rfc2459.Certificate()
    230        )[0]
    231        extendedCertificateOrCertificate["certificate"] = certificate
    232        certificates[0] = extendedCertificateOrCertificate
    233 
    234        if self.fieldStrip != FieldStrip.CERTIFICATE:
    235            signedData["certificates"] = certificates
    236 
    237        if self.fieldStrip != FieldStrip.SIGNER_INFO:
    238            signerInfos = rfc2315.SignerInfos()
    239 
    240            if len(self.sha1) > 0:
    241                signerInfos[len(signerInfos)] = self.buildSignerInfo(
    242                    certificate, pykey.HASH_SHA1, self.sha1
    243                )
    244            if len(self.sha256) > 0:
    245                signerInfos[len(signerInfos)] = self.buildSignerInfo(
    246                    certificate, pykey.HASH_SHA256, self.sha256
    247                )
    248            if len(self.md5) > 0:
    249                signerInfos[len(signerInfos)] = self.buildSignerInfo(
    250                    certificate, pykey.HASH_MD5, self.md5
    251                )
    252            signedData["signerInfos"] = signerInfos
    253 
    254        encoded = encoder.encode(signedData)
    255        anyTag = univ.Any(encoded).subtype(
    256            explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
    257        )
    258 
    259        contentInfo["content"] = anyTag
    260        return encoder.encode(contentInfo)
    261 
    262    def toPEM(self):
    263        output = "-----BEGIN PKCS7-----"
    264        der = self.toDER()
    265        b64 = base64.b64encode(der)
    266        while b64:
    267            output += "\n" + b64[:64].decode("utf-8")
    268            b64 = b64[64:]
    269        output += "\n-----END PKCS7-----\n"
    270        return output
    271 
    272 
    273 # The build harness will call this function with an output
    274 # file-like object and a path to a file containing a
    275 # specification. This will read the specification and output
    276 # the cms message as PEM.
    277 def main(output, inputPath):
    278    with open(inputPath) as configStream:
    279        output.write(CMS(configStream).toPEM() + "\n")
    280 
    281 
    282 # When run as a standalone program, this will read a specification from
    283 # stdin and output the cms message as PEM.
    284 if __name__ == "__main__":
    285    print(CMS(sys.stdin).toPEM())