1 """ndg_httpsclient - module containing SSL peer verification class.
2 """
3 __author__ = "P J Kershaw (STFC)"
4 __date__ = "09/12/11"
5 __copyright__ = "(C) 2012 Science and Technology Facilities Council"
6 __license__ = "BSD - see LICENSE file in top-level directory"
7 __contact__ = "Philip.Kershaw@stfc.ac.uk"
8 __revision__ = '$Id$'
9 import re
10 import logging
11 log = logging.getLogger(__name__)
12
13 try:
14 from ndg.httpsclient.subj_alt_name import SubjectAltName
15 from pyasn1.codec.der import decoder as der_decoder
16 SUBJ_ALT_NAME_SUPPORT = True
17
18 except ImportError as e:
19 SUBJ_ALT_NAME_SUPPORT = False
20 SUBJ_ALT_NAME_SUPPORT_MSG = (
21 'SubjectAltName support is disabled - check pyasn1 package '
22 'installation to enable'
23 )
24 import warnings
25 warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG)
29 """Check server identity. If hostname doesn't match, allow match of
30 host's Distinguished Name against server DN setting"""
31 DN_LUT = {
32 'commonName': 'CN',
33 'organisationalUnitName': 'OU',
34 'organisation': 'O',
35 'countryName': 'C',
36 'emailAddress': 'EMAILADDRESS',
37 'localityName': 'L',
38 'stateOrProvinceName': 'ST',
39 'streetAddress': 'STREET',
40 'domainComponent': 'DC',
41 'userid': 'UID'
42 }
43 SUBJ_ALT_NAME_EXT_NAME = 'subjectAltName'
44 PARSER_RE_STR = '/(%s)=' % '|'.join(list(DN_LUT.keys()) + \
45 list(DN_LUT.values()))
46 PARSER_RE = re.compile(PARSER_RE_STR)
47
48 __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match')
49
51 """Override parent class __init__ to enable setting of certDN
52 setting
53
54 @type certDN: string
55 @param certDN: Set the expected Distinguished Name of the
56 server to avoid errors matching hostnames. This is useful
57 where the hostname is not fully qualified
58 @type hostname: string
59 @param hostname: hostname to match against peer certificate
60 subjectAltNames or subject common name
61 @type subj_alt_name_match: bool
62 @param subj_alt_name_match: flag to enable/disable matching of hostname
63 against peer certificate subjectAltNames. Nb. A setting of True will
64 be ignored if the pyasn1 package is not installed
65 """
66 self.__certDN = None
67 self.__hostname = None
68
69 if certDN is not None:
70 self.certDN = certDN
71
72 if hostname is not None:
73 self.hostname = hostname
74
75 if subj_alt_name_match:
76 if not SUBJ_ALT_NAME_SUPPORT:
77 log.warning('Overriding "subj_alt_name_match" keyword setting: '
78 'peer verification with subjectAltNames is disabled')
79 self.__subj_alt_name_match = False
80 else:
81 self.__subj_alt_name_match = True
82 else:
83 log.debug('Disabling peer verification with subject '
84 'subjectAltNames!')
85 self.__subj_alt_name_match = False
86
89 """Verify server certificate
90
91 @type connection: OpenSSL.SSL.Connection
92 @param connection: SSL connection object
93 @type peerCert: basestring
94 @param peerCert: server host certificate as OpenSSL.crypto.X509
95 instance
96 @type errorStatus: int
97 @param errorStatus: error status passed from caller. This is the value
98 returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
99 x509_vfy.h in the OpenSSL source to get the meanings of the different
100 codes. PyOpenSSL doesn't help you!
101 @type errorDepth: int
102 @param errorDepth: a non-negative integer representing where in the
103 certificate chain the error occurred. If it is zero it occured in the
104 end entity certificate, one if it is the certificate which signed the
105 end entity certificate and so on.
106
107 @type preverifyOK: int
108 @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
109 SSL context irrespective of any verification checks done here. If this
110 function yields an OK status, it should enforce the preverifyOK value
111 so that any error set upstream overrides and is honoured.
112 @rtype: int
113 @return: status code - 0/False = Error, 1/True = OK
114 """
115 if peerCert.has_expired():
116
117 log.error('Certificate %r in peer certificate chain has expired',
118 peerCert.get_subject())
119
120 return False
121
122 elif errorDepth == 0:
123
124
125 peerCertSubj = peerCert.get_subject()
126 peerCertDN = peerCertSubj.get_components()
127 peerCertDN.sort()
128
129 if self.certDN is None:
130
131 if self.hostname is None:
132 log.error('No "hostname" or "certDN" set to check peer '
133 'certificate against')
134 return False
135
136
137 if self.__subj_alt_name_match:
138 dns_names = self._get_subj_alt_name(peerCert)
139 if self.hostname in dns_names:
140 return preverifyOK
141
142
143 if peerCertSubj.commonName == self.hostname:
144 return preverifyOK
145 else:
146 log.error('Peer certificate CN %r doesn\'t match the '
147 'expected CN %r', peerCertSubj.commonName,
148 self.hostname)
149 return False
150 else:
151 if peerCertDN == self.certDN:
152 return preverifyOK
153 else:
154 log.error('Peer certificate DN %r doesn\'t match the '
155 'expected DN %r', peerCertDN, self.certDN)
156 return False
157 else:
158 return preverifyOK
159
161 def verify_server_cert(connection, peerCert, errorStatus, errorDepth,
162 preverifyOK):
163 return self.__call__(connection, peerCert, errorStatus,
164 errorDepth, preverifyOK)
165
166 return verify_server_cert
167
168 @classmethod
170 '''Extract subjectAltName DNS name settings from certificate extensions
171
172 @param peer_cert: peer certificate in SSL connection. subjectAltName
173 settings if any will be extracted from this
174 @type peer_cert: OpenSSL.crypto.X509
175 '''
176
177 dns_name = []
178 general_names = SubjectAltName()
179 for i in range(peer_cert.get_extension_count()):
180 ext = peer_cert.get_extension(i)
181 ext_name = ext.get_short_name()
182 if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME:
183
184 ext_dat = ext.get_data()
185 decoded_dat = der_decoder.decode(ext_dat,
186 asn1Spec=general_names)
187
188 for name in decoded_dat:
189 if isinstance(name, SubjectAltName):
190 for entry in range(len(name)):
191 component = name.getComponentByPosition(entry)
192 dns_name.append(str(component.getComponent()))
193
194 return dns_name
195
198
200 if isinstance(val, str):
201
202 certDN = val.strip('"')
203
204 dnFields = self.__class__.PARSER_RE.split(certDN)
205 if len(dnFields) < 2:
206 raise TypeError('Error parsing DN string: "%s"' % certDN)
207
208 self.__certDN = list(zip(dnFields[1::2], dnFields[2::2]))
209 self.__certDN.sort()
210
211 elif not isinstance(val, list):
212 for i in val:
213 if not len(i) == 2:
214 raise TypeError('Expecting list of two element DN field, '
215 'DN field value pairs for "certDN" '
216 'attribute')
217 self.__certDN = val
218 else:
219 raise TypeError('Expecting list or string type for "certDN" '
220 'attribute')
221
222 certDN = property(fget=_getCertDN,
223 fset=_setCertDN,
224 doc="Distinguished Name for Server Certificate")
225
226
229
231 if not isinstance(val, str):
232 raise TypeError("Expecting string type for hostname "
233 "attribute")
234 self.__hostname = val
235
236 hostname = property(fget=_getHostname,
237 fset=_setHostname,
238 doc="hostname of server")
239