Coverage for admin/certificate_v2.py: 98%
199 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-11-18 02:59 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-11-18 02:59 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import re
24import base64
25import ecdsa
26import hashlib
27from datetime import datetime, UTC
28from pathlib import Path
29from cryptography import x509
30from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding
31from cryptography.hazmat.primitives.asymmetric import ec
32from .certificate_v1 import HSMCertificate
33from .utils import is_nonempty_hex_string
34from .misc import info
35from sgx.envelope import SgxQuote, SgxReportBody
38class HSMCertificateV2Element:
39 def __init__(self):
40 raise NotImplementedError("Cannot instantiate a HSMCertificateV2Element")
42 @classmethod
43 def from_dict(cls, element_map):
44 if element_map.get("type") not in cls.TYPE_MAPPING:
45 raise ValueError("Invalid or missing element type for "
46 f"element {element_map.get('name')}")
48 return cls.TYPE_MAPPING[element_map["type"]](element_map)
50 def _init_with_map(self, element_map):
51 if "name" not in element_map:
52 raise ValueError("Missing name for HSM certificate element")
54 self._name = element_map["name"]
56 if "signed_by" not in element_map:
57 raise ValueError("Missing certifier for HSM certificate element")
58 self._signed_by = element_map["signed_by"]
60 @property
61 def name(self):
62 return self._name
64 @property
65 def signed_by(self):
66 return self._signed_by
68 def get_value(self):
69 raise NotImplementedError(f"{type(self).__name__} can't provide a value")
71 def get_pubkey(self):
72 raise NotImplementedError(f"{type(self).__name__} can't provide a public key")
74 def is_valid(self, certifier):
75 raise NotImplementedError(f"{type(self).__name__} can't be queried for validity")
77 def get_tweak(self):
78 return None
80 def get_collateral(self):
81 return None
84class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element):
85 def __init__(self, element_map):
86 self._init_with_map(element_map)
88 def _init_with_map(self, element_map):
89 super()._init_with_map(element_map)
91 if not is_nonempty_hex_string(element_map.get("message")):
92 raise ValueError(f"Invalid message for HSM certificate element {self.name}")
93 self._message = bytes.fromhex(element_map["message"])
95 if not is_nonempty_hex_string(element_map.get("custom_data")):
96 raise ValueError("Invalid custom data for HSM certificate "
97 f"element {self.name}")
98 self._custom_data = bytes.fromhex(element_map["custom_data"])
100 if not is_nonempty_hex_string(element_map.get("signature")):
101 raise ValueError("Invalid signature for HSM certificate element {self.name}")
102 self._signature = bytes.fromhex(element_map["signature"])
104 @property
105 def message(self):
106 return SgxQuote(self._message)
108 @property
109 def custom_data(self):
110 return self._custom_data.hex()
112 @property
113 def signature(self):
114 return self._signature.hex()
116 def is_valid(self, certifier):
117 try:
118 # Validate custom data
119 expected = hashlib.sha256(self._custom_data).digest()
120 if expected != self.message.report_body.report_data.field[:len(expected)]:
121 return False
123 # Verify signature against the certifier
124 return certifier.get_pubkey().verify_digest(
125 self._signature,
126 hashlib.sha256(self._message).digest(),
127 ecdsa.util.sigdecode_der,
128 )
129 except Exception:
130 return False
132 def get_value(self):
133 return {
134 "sgx_quote": self.message,
135 "message": self.custom_data,
136 }
138 def to_dict(self):
139 return {
140 "name": self.name,
141 "type": "sgx_quote",
142 "message": self._message.hex(),
143 "custom_data": self.custom_data,
144 "signature": self.signature,
145 "signed_by": self.signed_by,
146 }
149class HSMCertificateV2ElementSGXAttestationKey(HSMCertificateV2Element):
150 def __init__(self, element_map):
151 self._init_with_map(element_map)
153 def _init_with_map(self, element_map):
154 super()._init_with_map(element_map)
156 if not is_nonempty_hex_string(element_map.get("message")):
157 raise ValueError(f"Invalid message for HSM certificate element {self.name}")
158 self._message = bytes.fromhex(element_map["message"])
160 if not is_nonempty_hex_string(element_map.get("key")):
161 raise ValueError(f"Invalid key for HSM certificate element {self.name}")
162 self._key = bytes.fromhex(element_map["key"])
164 if not is_nonempty_hex_string(element_map.get("auth_data")):
165 raise ValueError(f"Invalid auth data for HSM certificate element {self.name}")
166 self._auth_data = bytes.fromhex(element_map["auth_data"])
168 if not is_nonempty_hex_string(element_map.get("signature")):
169 raise ValueError(f"Invalid signature for HSM certificate element {self.name}")
170 self._signature = bytes.fromhex(element_map["signature"])
172 @property
173 def message(self):
174 return SgxReportBody(self._message)
176 @property
177 def key(self):
178 return ecdsa.VerifyingKey.from_string(self._key, ecdsa.NIST256p)
180 @property
181 def auth_data(self):
182 return self._auth_data.hex()
184 @property
185 def signature(self):
186 return self._signature.hex()
188 def is_valid(self, certifier):
189 try:
190 # Validate report data
191 expected = hashlib.sha256(self.key.to_string() + self._auth_data).digest()
192 if expected != self.message.report_data.field[:len(expected)]:
193 return False
195 # Verify signature against the certifier
196 return certifier.get_pubkey().verify_digest(
197 self._signature,
198 hashlib.sha256(self._message).digest(),
199 ecdsa.util.sigdecode_der,
200 )
201 except Exception:
202 return False
204 def get_pubkey(self):
205 return ecdsa.VerifyingKey.from_string(self._key, ecdsa.NIST256p)
207 def to_dict(self):
208 return {
209 "name": self.name,
210 "type": "sgx_attestation_key",
211 "message": self.message.get_raw_data().hex(),
212 "key": self.key.to_string("uncompressed").hex(),
213 "auth_data": self.auth_data,
214 "signature": self.signature,
215 "signed_by": self.signed_by,
216 }
218 def get_collateral(self):
219 return self.message
222class HSMCertificateV2ElementX509(HSMCertificateV2Element):
223 HEADER_BEGIN = "-----BEGIN CERTIFICATE-----"
224 HEADER_END = "-----END CERTIFICATE-----"
226 _certificate_validator = None
228 @classmethod
229 def set_certificate_validator(cls, certificate_validator):
230 cls._certificate_validator = certificate_validator
232 _collateral_getter = None
234 @classmethod
235 def set_collateral_getter(cls, collateral_getter):
236 cls._collateral_getter = collateral_getter
238 @classmethod
239 def from_pemfile(cls, pem_path, name, signed_by):
240 return cls.from_pem(Path(pem_path).read_text(), name, signed_by)
242 @classmethod
243 def from_pem(cls, pem_str, name, signed_by):
244 return cls({
245 "name": name,
246 "message": re.sub(r"[\s\n\r]+", " ", pem_str)
247 .replace(cls.HEADER_END, "")
248 .replace(cls.HEADER_BEGIN, "")
249 .strip().encode(),
250 "signed_by": signed_by,
251 })
253 def __init__(self, element_map):
254 self._init_with_map(element_map)
255 self._certificate = None
257 def _init_with_map(self, element_map):
258 super()._init_with_map(element_map)
260 try:
261 self._message = base64.b64decode(element_map.get("message"))
262 except Exception:
263 raise ValueError(f"Invalid message for HSM certificate element {self.name}")
265 @property
266 def message(self):
267 return base64.b64encode(self._message).decode("ASCII")
269 @property
270 def certificate(self):
271 if self._certificate is None:
272 self._certificate = x509.load_pem_x509_certificate((
273 self.HEADER_BEGIN + self.message + self.HEADER_END).encode())
274 return self._certificate
276 @property
277 def is_root_of_trust(self):
278 return self.name == self.signed_by
280 def is_valid(self, certifier):
281 # IMPORTANT: for now, we only allow verifying the validity of an
282 # HSMCertificateV2ElementX509 using another HSMCertificateV2ElementX509
283 # instance as certifier. That way, we can simplify the validation procedure
284 # by using a helper X509 validator and therefore ensure maximum use of the
285 # underlying library's capabilities (cryptography).
286 if not isinstance(certifier, type(self)):
287 raise RuntimeError(f"Invalid certifier given for {type(self)} validation")
289 # Certificate validator must be injected
290 if self._certificate_validator is None:
291 raise RuntimeError("Certificate validator not set")
293 subject = self.certificate
294 issuer = certifier.certificate
295 now = datetime.now(UTC)
297 result = self._certificate_validator.validate(
298 subject, issuer, now, check_crl=not self.is_root_of_trust)
300 if not result["valid"]:
301 # TODO: find a better way of showing this
302 info(f"While validating element {self.name}: {result["reason"]}")
303 return False
305 # TODO: find a better way of showing this
306 if len(result["warnings"]) > 0:
307 info("***** WARNINGS *****")
308 for warning in result["warnings"]:
309 info(warning)
310 info("********************")
312 return True
314 def get_pubkey(self):
315 try:
316 public_key = self.certificate.public_key()
318 if not isinstance(public_key.curve, ec.SECP256R1):
319 raise ValueError("Certificate does not have a NIST P-256 public key")
321 public_bytes = public_key.public_bytes(
322 Encoding.X962, PublicFormat.CompressedPoint)
324 return ecdsa.VerifyingKey.from_string(public_bytes, ecdsa.NIST256p)
325 except Exception as e:
326 raise ValueError(f"Error gathering public key from certificate: {str(e)}")
328 def to_dict(self):
329 return {
330 "name": self.name,
331 "type": "x509_pem",
332 "message": self.message,
333 "signed_by": self.signed_by,
334 }
336 def get_collateral(self):
337 if type(self)._collateral_getter is None:
338 raise RuntimeError("Collateral getter for X509 certificate elements not set")
340 cg = type(self)._collateral_getter
341 return cg(self.certificate)
344# Element type mappings
345HSMCertificateV2Element.TYPE_MAPPING = {
346 "sgx_quote": HSMCertificateV2ElementSGXQuote,
347 "sgx_attestation_key": HSMCertificateV2ElementSGXAttestationKey,
348 "x509_pem": HSMCertificateV2ElementX509,
349}
352class HSMCertificateV2(HSMCertificate):
353 VERSION = 2
354 ROOT_ELEMENT = "sgx_root"
355 ELEMENT_BASE_CLASS = HSMCertificateV2Element
356 ELEMENT_FACTORY = HSMCertificateV2Element.from_dict