Coverage for admin/sgx_utils.py: 97%
155 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-30 06:22 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-30 06:22 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import requests
25import ecdsa
26from datetime import datetime, UTC
27from hashlib import sha256
28from cryptography import x509
29from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding
30from pyasn1.codec.der import decoder
31from pyasn1.type.univ import SequenceOf, Integer, OctetString
32from urllib.parse import unquote as url_unquote
33from .x509_utils import split_pem_certificates, get_intel_pcs_x509_crl
34from .x509_validator import X509CertificateValidator
37def _parse_asn1_extensions(extension, base_oid, spec):
38 def assert_type(id, v, t):
39 if not isinstance(v, t):
40 raise RuntimeError(f"Expected for element {id} to be a {t} "
41 f"but found a {type(v)} instead")
43 oid = str(extension[0])
44 if base_oid != oid:
45 raise RuntimeError(f"Expected finding extension with OID {base_oid} but "
46 f"found {oid} instead")
48 value = extension[1]
49 if isinstance(spec, list) or spec["type"] == "seq":
50 assert_type(base_oid, value, SequenceOf)
51 if isinstance(spec, list):
52 items = spec
53 else:
54 items = spec["items"]
55 result = {}
56 for index, item in enumerate(items):
57 xt = value[index]
58 item_oid = f"{base_oid}{item["oid"]}"
59 result[item["name"]] = _parse_asn1_extensions(xt, item_oid, item)
60 return result
62 if spec["type"] == "seq":
63 assert_type(base_oid, value, SequenceOf)
65 if spec["type"] == "bytes":
66 assert_type(base_oid, value, OctetString)
67 return bytes(value).hex()
69 if spec["type"] == "int":
70 assert_type(base_oid, value, Integer)
71 return int(value)
73 raise RuntimeError(f"Unknown spec type {spec["type"]}")
76def get_sgx_extensions(certificate):
77 BASE_OID = "1.2.840.113741.1.13.1"
78 SGX_EXTENSIONS_SPEC = [
79 {"oid": ".1", "name": "ppid", "type": "bytes"},
80 {"oid": ".2", "name": "tcb", "type": "seq", "items": [
81 {"oid": ".1", "name": "comp01", "type": "int"},
82 {"oid": ".2", "name": "comp02", "type": "int"},
83 {"oid": ".3", "name": "comp03", "type": "int"},
84 {"oid": ".4", "name": "comp04", "type": "int"},
85 {"oid": ".5", "name": "comp05", "type": "int"},
86 {"oid": ".6", "name": "comp06", "type": "int"},
87 {"oid": ".7", "name": "comp07", "type": "int"},
88 {"oid": ".8", "name": "comp08", "type": "int"},
89 {"oid": ".9", "name": "comp09", "type": "int"},
90 {"oid": ".10", "name": "comp10", "type": "int"},
91 {"oid": ".11", "name": "comp11", "type": "int"},
92 {"oid": ".12", "name": "comp12", "type": "int"},
93 {"oid": ".13", "name": "comp13", "type": "int"},
94 {"oid": ".14", "name": "comp14", "type": "int"},
95 {"oid": ".15", "name": "comp15", "type": "int"},
96 {"oid": ".16", "name": "comp16", "type": "int"},
97 {"oid": ".17", "name": "pcesvn", "type": "int"},
98 {"oid": ".18", "name": "cpusvn", "type": "bytes"},
99 ]},
100 {"oid": ".3", "name": "pceid", "type": "bytes"},
101 {"oid": ".4", "name": "fmspc", "type": "bytes"},
102 ]
104 try:
105 oid = x509.ObjectIdentifier(BASE_OID)
106 extensions = certificate.extensions.get_extension_for_oid(oid)
107 extensions = [
108 extensions.value.oid.dotted_string,
109 list(decoder.decode(extensions.value.value))[0]
110 ]
111 return _parse_asn1_extensions(extensions, BASE_OID, SGX_EXTENSIONS_SPEC)
112 except x509.extensions.ExtensionNotFound:
113 return None
116def _get_auth_json_info(url, chain_header, digest_key, root_of_trust):
117 info_res = requests.get(url)
118 if info_res.status_code != 200:
119 raise RuntimeError(f"Server replied with status {info_res.status_code}")
121 warnings = []
123 # Parse info
124 ctype = info_res.headers["Content-Type"]
125 if ctype != "application/json":
126 raise RuntimeError(f"Unknown content-type: {ctype}")
127 info = json.loads(info_res.text)
129 warning = info_res.headers.get("warning")
130 if warning is not None:
131 warning = f"Getting {url}: {warning}"
132 warnings.append(warning)
134 # Parse certification chain
135 issuer_chain = info_res.headers.get(chain_header)
136 if issuer_chain is None:
137 raise RuntimeError("No issuer certification chain in response")
139 issuer_chain = split_pem_certificates(url_unquote(issuer_chain))
140 issuer_chain = list(map(
141 lambda pem: x509.load_pem_x509_certificate(pem.encode()), issuer_chain))
143 if len(issuer_chain) < 2:
144 raise RuntimeError("Expected at least two certificates "
145 "in the issuer chain")
147 # Check root of trust is same as expected
148 ic_root = issuer_chain[-1]
149 if root_of_trust != ic_root:
150 raise RuntimeError(f"Root of trust ({root_of_trust.subject}) does not "
151 f"match root of TCB info issuer chain: {ic_root.subject}")
153 # Validate certification chain in root to leaf order
154 validator = X509CertificateValidator(get_intel_pcs_x509_crl)
155 now = datetime.now(UTC)
156 issuer = ic_root
157 for subject in reversed(issuer_chain[:-1]):
158 result = validator.validate(subject, issuer, now)
159 if not result["valid"]:
160 raise RuntimeError("Error validating TCB info issuer "
161 f"chain: {result["reason"]}")
162 warnings += result["warnings"]
163 issuer = subject
165 # Validate info signature
166 issuer = issuer_chain[0]
167 digest = sha256(json.dumps(
168 info[digest_key], indent=None, separators=(",", ":")).encode()).digest()
169 pubkey = ecdsa.VerifyingKey.from_string(issuer.public_key().public_bytes(
170 Encoding.X962, PublicFormat.CompressedPoint), ecdsa.NIST256p)
171 pubkey.verify_digest(
172 bytes.fromhex(info["signature"]),
173 digest,
174 sigdecode=ecdsa.util.sigdecode_string
175 )
177 return {
178 "info": info,
179 "warnings": warnings,
180 }
183def get_tcb_info(url, fmspc_hex, root_of_trust, update="early"):
184 try:
185 final_url = f"{url}?fmspc={fmspc_hex}&update={update}"
186 result = _get_auth_json_info(
187 final_url, "TCB-Info-Issuer-Chain", "tcbInfo", root_of_trust)
189 return {
190 "tcb_info": result["info"],
191 "warnings": result["warnings"],
192 }
193 except Exception as e:
194 raise RuntimeError(f"While fetching TCB info from {final_url}: {e}")
197def validate_tcb_info(pck_info, tcb_info):
198 try:
199 matching_level = None
200 for level in tcb_info["tcbLevels"]:
201 found = True
202 svns_info = []
203 for index, component in enumerate(level["tcb"]["sgxtcbcomponents"]):
204 comp_id = f"comp{index+1:02}"
205 pck_svn = pck_info["tcb"][comp_id]
206 tcb_svn = component["svn"]
207 if pck_svn < tcb_svn:
208 found = False
209 break
210 svns_info.append(f"Comp {index+1:02}: {pck_svn} >= {tcb_svn}")
212 if not found:
213 continue
215 pck_pcesvn = pck_info["tcb"]["pcesvn"]
216 tcb_pcesvn = level["tcb"]["pcesvn"]
217 if pck_pcesvn >= tcb_pcesvn:
218 svns_info.append(f"PCESVN: {pck_pcesvn} >= {tcb_pcesvn}")
219 matching_level = level
220 break
222 if matching_level is None:
223 return {
224 "valid": False,
225 "reason": "TCB level is unsupported"
226 }
228 return {
229 "valid": True,
230 "status": matching_level["tcbStatus"],
231 "date": matching_level["tcbDate"],
232 "advisories": matching_level["advisoryIDs"],
233 "svns": svns_info,
234 "edn": tcb_info["tcbEvaluationDataNumber"],
235 }
236 except Exception as e:
237 return {
238 "valid": False,
239 "reason": f"While validating TCB information: {e}",
240 }
243def get_qeid_info(url, root_of_trust, update="early"):
244 try:
245 final_url = f"{url}?update={update}"
246 result = _get_auth_json_info(
247 final_url, "SGX-Enclave-Identity-Issuer-Chain",
248 "enclaveIdentity", root_of_trust)
250 return {
251 "qeid_info": result["info"],
252 "warnings": result["warnings"],
253 }
254 except Exception as e:
255 raise RuntimeError(f"While fetching QE identity info from {final_url}: {e}")
258def validate_qeid_info(report_info, qeid_info):
259 def fail(message):
260 return {
261 "valid": False,
262 "reason": message
263 }
265 try:
266 if report_info.mrsigner != bytes.fromhex(qeid_info["mrsigner"]):
267 return fail("QE MRSIGNER does not match the Intel QE identity information")
269 if report_info.isvprodid != qeid_info["isvprodid"]:
270 return fail("QE ISVPRODID does not match the Intel QE identity information")
272 masked_miscselect = report_info.miscselect & \
273 int.from_bytes(bytes.fromhex(qeid_info["miscselectMask"]),
274 byteorder="little", signed=False)
275 miscselect = int.from_bytes(
276 bytes.fromhex(qeid_info["miscselect"]), byteorder="little", signed=False)
277 if masked_miscselect != miscselect:
278 return fail("QE MISCSELECT does not match the Intel QE identity information")
280 report_attributes = int.from_bytes(
281 report_info.attributes.flags.to_bytes(8, byteorder="little") +
282 report_info.attributes.xfrm.to_bytes(8, byteorder="little"),
283 byteorder="little", signed=False)
284 masked_attributes = report_attributes & \
285 int.from_bytes(bytes.fromhex(qeid_info["attributesMask"]),
286 byteorder="little", signed=False)
287 attributes = int.from_bytes(
288 bytes.fromhex(qeid_info["attributes"]), byteorder="little", signed=False)
289 if masked_attributes != attributes:
290 return fail("QE ATTRIBUTES does not match the Intel QE identity information")
292 matching_level = None
293 for level in qeid_info["tcbLevels"]:
294 if report_info.isvsvn >= level["tcb"]["isvsvn"]:
295 matching_level = level
296 break
298 if matching_level is None:
299 return fail("QE TCB level is not supported")
301 return {
302 "valid": True,
303 "status": matching_level["tcbStatus"],
304 "date": matching_level["tcbDate"],
305 "advisories": matching_level.get("advisoryIDs", []),
306 "isvsvn": f"{report_info.isvsvn} >= {matching_level["tcb"]["isvsvn"]}",
307 "edn": qeid_info["tcbEvaluationDataNumber"],
308 }
309 except Exception as e:
310 return {
311 "valid": False,
312 "reason": f"While validating QE identity information: {e}",
313 }