pycms.py (11510B)
1 #!/usr/bin/env python 2 # 3 # This Source Code Form is subject to the terms of the Mozilla Public 4 # License, v. 2.0. If a copy of the MPL was not distributed with this 5 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7 """ 8 Reads a specification from stdin and outputs a PKCS7 (CMS) message with 9 the desired properties. 10 11 The specification format is as follows: 12 13 sha1:<hex string> 14 sha256:<hex string> 15 md5:<hex string> 16 tamperDigest:sha1 - Only sha1 is supported 17 erase:{certificate, signerInfo} 18 signer: 19 <pycert specification> 20 21 Eith or both of sha1 and sha256 may be specified. The value of 22 each hash directive is what will be put in the messageDigest 23 attribute of the SignerInfo that corresponds to the signature 24 algorithm defined by the hash algorithm and key type of the 25 default key. Together, these comprise the signerInfos field of 26 the SignedData. If neither hash is specified, the signerInfos 27 will be an empty SET (i.e. there will be no actual signature 28 information). 29 The certificate specification must come last. 30 The script provides a possibility to tamper the hash while generating 31 SignedAttributes, such that the SignedAttributes signature will be incorrect 32 Erase allows specifying which PKCS7 field to strip (supports certificate or signerInfo) 33 """ 34 35 import base64 36 import sys 37 from enum import Enum 38 from io import StringIO 39 40 import pycert 41 import pykey 42 from pyasn1.codec.der import decoder, encoder 43 from pyasn1.type import tag, univ 44 from pyasn1_modules import rfc2315, rfc2459 45 46 47 class Error(Exception): 48 """Base class for exceptions in this module.""" 49 50 pass 51 52 53 class UnknownDirectiveError(Error): 54 """Helper exception type to handle unknown specification 55 directives.""" 56 57 def __init__(self, directive): 58 super().__init__() 59 self.directive = directive 60 61 def __str__(self): 62 return "Unknown directive %s" % repr(self.directive) 63 64 65 class FieldStrip(Enum): 66 CERTIFICATE = "certificate" 67 SIGNER_INFO = "signerInfo" 68 69 70 class HashToTamper(Enum): 71 SHA1 = "sha1" 72 73 74 class CMS: 75 """Utility class for reading a CMS specification and 76 generating a CMS message""" 77 78 def __init__(self, paramStream): 79 self.sha1 = "" 80 self.sha256 = "" 81 self.md5 = "" 82 83 self.fieldStrip = "" 84 self.tamperDigest = "" 85 86 signerSpecification = StringIO() 87 readingSignerSpecification = False 88 for line in paramStream.readlines(): 89 if readingSignerSpecification: 90 print(line.strip(), file=signerSpecification) 91 elif line.strip() == "signer:": 92 readingSignerSpecification = True 93 elif line.startswith("sha1:"): 94 self.sha1 = line.strip()[len("sha1:") :] 95 elif line.startswith("sha256:"): 96 self.sha256 = line.strip()[len("sha256:") :] 97 elif line.startswith("md5:"): 98 self.md5 = line.strip()[len("md5:") :] 99 elif line.startswith("erase:"): 100 if line.strip()[len("erase:") :] == FieldStrip.CERTIFICATE.value: 101 self.fieldStrip = FieldStrip.CERTIFICATE 102 elif line.strip()[len("erase:") :] == FieldStrip.SIGNER_INFO.value: 103 self.fieldStrip = FieldStrip.SIGNER_INFO 104 else: 105 raise UnknownDirectiveError(line.strip()) 106 elif line.startswith("tamperDigest"): 107 if line.strip()[len("tamperDigest:") :] == HashToTamper.SHA1.value: 108 self.tamperDigest = HashToTamper.SHA1 109 else: 110 raise UnknownDirectiveError(line.strip()) 111 else: 112 raise UnknownDirectiveError(line.strip()) 113 signerSpecification.seek(0) 114 self.signer = pycert.Certificate(signerSpecification) 115 self.signingKey = pykey.keyFromSpecification("default") 116 117 def buildAuthenticatedAttributes(self, value, implicitTag=None): 118 """Utility function to build a pyasn1 AuthenticatedAttributes 119 object. Useful because when building a SignerInfo, the 120 authenticatedAttributes needs to be tagged implicitly, but when 121 signing an AuthenticatedAttributes, it needs the explicit SET 122 tag.""" 123 if implicitTag: 124 authenticatedAttributes = rfc2315.Attributes().subtype( 125 implicitTag=implicitTag 126 ) 127 else: 128 authenticatedAttributes = rfc2315.Attributes() 129 contentTypeAttribute = rfc2315.Attribute() 130 # PKCS#9 contentType 131 contentTypeAttribute["type"] = univ.ObjectIdentifier("1.2.840.113549.1.9.3") 132 contentTypeAttribute["values"] = univ.SetOf(rfc2459.AttributeValue()) 133 # PKCS#7 data 134 contentTypeAttribute["values"][0] = univ.ObjectIdentifier( 135 "1.2.840.113549.1.7.1" 136 ) 137 authenticatedAttributes[0] = contentTypeAttribute 138 hashAttribute = rfc2315.Attribute() 139 # PKCS#9 messageDigest 140 hashAttribute["type"] = univ.ObjectIdentifier("1.2.840.113549.1.9.4") 141 hashAttribute["values"] = univ.SetOf(rfc2459.AttributeValue()) 142 hashAttribute["values"][0] = univ.OctetString(hexValue=value) 143 authenticatedAttributes[1] = hashAttribute 144 return authenticatedAttributes 145 146 def pykeyHashToDigestAlgorithm(self, pykeyHash): 147 """Given a pykey hash algorithm identifier, builds an 148 AlgorithmIdentifier for use with pyasn1.""" 149 if pykeyHash == pykey.HASH_SHA1: 150 oidString = "1.3.14.3.2.26" 151 elif pykeyHash == pykey.HASH_SHA256: 152 oidString = "2.16.840.1.101.3.4.2.1" 153 elif pykeyHash == pykey.HASH_MD5: 154 oidString = "1.2.840.113549.2.5" 155 else: 156 raise pykey.UnknownHashAlgorithmError(pykeyHash) 157 algorithmIdentifier = rfc2459.AlgorithmIdentifier() 158 algorithmIdentifier["algorithm"] = univ.ObjectIdentifier(oidString) 159 # Directly setting parameters to univ.Null doesn't currently work. 160 nullEncapsulated = encoder.encode(univ.Null()) 161 algorithmIdentifier["parameters"] = univ.Any(nullEncapsulated) 162 return algorithmIdentifier 163 164 def buildSignerInfo(self, certificate, pykeyHash, digestValue): 165 """Given a pyasn1 certificate, a pykey hash identifier 166 and a hash value, creates a SignerInfo with the 167 appropriate values.""" 168 signerInfo = rfc2315.SignerInfo() 169 signerInfo["version"] = 1 170 issuerAndSerialNumber = rfc2315.IssuerAndSerialNumber() 171 issuerAndSerialNumber["issuer"] = self.signer.getIssuer() 172 issuerAndSerialNumber["serialNumber"] = certificate["tbsCertificate"][ 173 "serialNumber" 174 ] 175 signerInfo["issuerAndSerialNumber"] = issuerAndSerialNumber 176 signerInfo["digestAlgorithm"] = self.pykeyHashToDigestAlgorithm(pykeyHash) 177 rsa = rfc2459.AlgorithmIdentifier() 178 rsa["algorithm"] = rfc2459.rsaEncryption 179 rsa["parameters"] = univ.Null() 180 authenticatedAttributes = self.buildAuthenticatedAttributes( 181 digestValue, 182 implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0), 183 ) 184 authenticatedAttributesTBS = self.buildAuthenticatedAttributes(digestValue) 185 signerInfo["authenticatedAttributes"] = authenticatedAttributes 186 signerInfo["digestEncryptionAlgorithm"] = rsa 187 authenticatedAttributesEncoded = encoder.encode(authenticatedAttributesTBS) 188 signature = self.signingKey.sign(authenticatedAttributesEncoded, pykeyHash) 189 if self.tamperDigest == HashToTamper.SHA1 and pykeyHash == pykey.HASH_SHA1: 190 digestValue = hex((int(digestValue[0], 16) + 1) % 16)[2:] + digestValue[1:] 191 authenticatedAttributesTBSTamperedHash = self.buildAuthenticatedAttributes( 192 digestValue 193 ) 194 authenticatedAttributesTamperedEncoded = encoder.encode( 195 authenticatedAttributesTBSTamperedHash 196 ) 197 # The signerInfo has an attribute with the initial hash 198 # But the tampered hash attributes are signed 199 signature = self.signingKey.sign( 200 authenticatedAttributesTamperedEncoded, pykeyHash 201 ) 202 # signature will be a hexified bit string of the form 203 # "'<hex bytes>'H". For some reason that's what BitString wants, 204 # but since this is an OCTET STRING, we have to strip off the 205 # quotation marks and trailing "H". 206 signerInfo["encryptedDigest"] = univ.OctetString(hexValue=signature[1:-2]) 207 return signerInfo 208 209 def toDER(self): 210 contentInfo = rfc2315.ContentInfo() 211 contentInfo["contentType"] = rfc2315.signedData 212 213 signedData = rfc2315.SignedData() 214 signedData["version"] = rfc2315.Version(1) 215 216 digestAlgorithms = rfc2315.DigestAlgorithmIdentifiers() 217 digestAlgorithms[0] = self.pykeyHashToDigestAlgorithm(pykey.HASH_SHA1) 218 signedData["digestAlgorithms"] = digestAlgorithms 219 220 dataContentInfo = rfc2315.ContentInfo() 221 dataContentInfo["contentType"] = rfc2315.data 222 signedData["contentInfo"] = dataContentInfo 223 224 certificates = rfc2315.ExtendedCertificatesAndCertificates().subtype( 225 implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) 226 ) 227 extendedCertificateOrCertificate = rfc2315.ExtendedCertificateOrCertificate() 228 certificate = decoder.decode( 229 self.signer.toDER(), asn1Spec=rfc2459.Certificate() 230 )[0] 231 extendedCertificateOrCertificate["certificate"] = certificate 232 certificates[0] = extendedCertificateOrCertificate 233 234 if self.fieldStrip != FieldStrip.CERTIFICATE: 235 signedData["certificates"] = certificates 236 237 if self.fieldStrip != FieldStrip.SIGNER_INFO: 238 signerInfos = rfc2315.SignerInfos() 239 240 if len(self.sha1) > 0: 241 signerInfos[len(signerInfos)] = self.buildSignerInfo( 242 certificate, pykey.HASH_SHA1, self.sha1 243 ) 244 if len(self.sha256) > 0: 245 signerInfos[len(signerInfos)] = self.buildSignerInfo( 246 certificate, pykey.HASH_SHA256, self.sha256 247 ) 248 if len(self.md5) > 0: 249 signerInfos[len(signerInfos)] = self.buildSignerInfo( 250 certificate, pykey.HASH_MD5, self.md5 251 ) 252 signedData["signerInfos"] = signerInfos 253 254 encoded = encoder.encode(signedData) 255 anyTag = univ.Any(encoded).subtype( 256 explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) 257 ) 258 259 contentInfo["content"] = anyTag 260 return encoder.encode(contentInfo) 261 262 def toPEM(self): 263 output = "-----BEGIN PKCS7-----" 264 der = self.toDER() 265 b64 = base64.b64encode(der) 266 while b64: 267 output += "\n" + b64[:64].decode("utf-8") 268 b64 = b64[64:] 269 output += "\n-----END PKCS7-----\n" 270 return output 271 272 273 # The build harness will call this function with an output 274 # file-like object and a path to a file containing a 275 # specification. This will read the specification and output 276 # the cms message as PEM. 277 def main(output, inputPath): 278 with open(inputPath) as configStream: 279 output.write(CMS(configStream).toPEM() + "\n") 280 281 282 # When run as a standalone program, this will read a specification from 283 # stdin and output the cms message as PEM. 284 if __name__ == "__main__": 285 print(CMS(sys.stdin).toPEM())