Coverage for admin/sgx_utils.py: 97%

155 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-10-30 06:22 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import requests 

25import ecdsa 

26from datetime import datetime, UTC 

27from hashlib import sha256 

28from cryptography import x509 

29from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding 

30from pyasn1.codec.der import decoder 

31from pyasn1.type.univ import SequenceOf, Integer, OctetString 

32from urllib.parse import unquote as url_unquote 

33from .x509_utils import split_pem_certificates, get_intel_pcs_x509_crl 

34from .x509_validator import X509CertificateValidator 

35 

36 

37def _parse_asn1_extensions(extension, base_oid, spec): 

38 def assert_type(id, v, t): 

39 if not isinstance(v, t): 

40 raise RuntimeError(f"Expected for element {id} to be a {t} " 

41 f"but found a {type(v)} instead") 

42 

43 oid = str(extension[0]) 

44 if base_oid != oid: 

45 raise RuntimeError(f"Expected finding extension with OID {base_oid} but " 

46 f"found {oid} instead") 

47 

48 value = extension[1] 

49 if isinstance(spec, list) or spec["type"] == "seq": 

50 assert_type(base_oid, value, SequenceOf) 

51 if isinstance(spec, list): 

52 items = spec 

53 else: 

54 items = spec["items"] 

55 result = {} 

56 for index, item in enumerate(items): 

57 xt = value[index] 

58 item_oid = f"{base_oid}{item["oid"]}" 

59 result[item["name"]] = _parse_asn1_extensions(xt, item_oid, item) 

60 return result 

61 

62 if spec["type"] == "seq": 

63 assert_type(base_oid, value, SequenceOf) 

64 

65 if spec["type"] == "bytes": 

66 assert_type(base_oid, value, OctetString) 

67 return bytes(value).hex() 

68 

69 if spec["type"] == "int": 

70 assert_type(base_oid, value, Integer) 

71 return int(value) 

72 

73 raise RuntimeError(f"Unknown spec type {spec["type"]}") 

74 

75 

76def get_sgx_extensions(certificate): 

77 BASE_OID = "1.2.840.113741.1.13.1" 

78 SGX_EXTENSIONS_SPEC = [ 

79 {"oid": ".1", "name": "ppid", "type": "bytes"}, 

80 {"oid": ".2", "name": "tcb", "type": "seq", "items": [ 

81 {"oid": ".1", "name": "comp01", "type": "int"}, 

82 {"oid": ".2", "name": "comp02", "type": "int"}, 

83 {"oid": ".3", "name": "comp03", "type": "int"}, 

84 {"oid": ".4", "name": "comp04", "type": "int"}, 

85 {"oid": ".5", "name": "comp05", "type": "int"}, 

86 {"oid": ".6", "name": "comp06", "type": "int"}, 

87 {"oid": ".7", "name": "comp07", "type": "int"}, 

88 {"oid": ".8", "name": "comp08", "type": "int"}, 

89 {"oid": ".9", "name": "comp09", "type": "int"}, 

90 {"oid": ".10", "name": "comp10", "type": "int"}, 

91 {"oid": ".11", "name": "comp11", "type": "int"}, 

92 {"oid": ".12", "name": "comp12", "type": "int"}, 

93 {"oid": ".13", "name": "comp13", "type": "int"}, 

94 {"oid": ".14", "name": "comp14", "type": "int"}, 

95 {"oid": ".15", "name": "comp15", "type": "int"}, 

96 {"oid": ".16", "name": "comp16", "type": "int"}, 

97 {"oid": ".17", "name": "pcesvn", "type": "int"}, 

98 {"oid": ".18", "name": "cpusvn", "type": "bytes"}, 

99 ]}, 

100 {"oid": ".3", "name": "pceid", "type": "bytes"}, 

101 {"oid": ".4", "name": "fmspc", "type": "bytes"}, 

102 ] 

103 

104 try: 

105 oid = x509.ObjectIdentifier(BASE_OID) 

106 extensions = certificate.extensions.get_extension_for_oid(oid) 

107 extensions = [ 

108 extensions.value.oid.dotted_string, 

109 list(decoder.decode(extensions.value.value))[0] 

110 ] 

111 return _parse_asn1_extensions(extensions, BASE_OID, SGX_EXTENSIONS_SPEC) 

112 except x509.extensions.ExtensionNotFound: 

113 return None 

114 

115 

116def _get_auth_json_info(url, chain_header, digest_key, root_of_trust): 

117 info_res = requests.get(url) 

118 if info_res.status_code != 200: 

119 raise RuntimeError(f"Server replied with status {info_res.status_code}") 

120 

121 warnings = [] 

122 

123 # Parse info 

124 ctype = info_res.headers["Content-Type"] 

125 if ctype != "application/json": 

126 raise RuntimeError(f"Unknown content-type: {ctype}") 

127 info = json.loads(info_res.text) 

128 

129 warning = info_res.headers.get("warning") 

130 if warning is not None: 

131 warning = f"Getting {url}: {warning}" 

132 warnings.append(warning) 

133 

134 # Parse certification chain 

135 issuer_chain = info_res.headers.get(chain_header) 

136 if issuer_chain is None: 

137 raise RuntimeError("No issuer certification chain in response") 

138 

139 issuer_chain = split_pem_certificates(url_unquote(issuer_chain)) 

140 issuer_chain = list(map( 

141 lambda pem: x509.load_pem_x509_certificate(pem.encode()), issuer_chain)) 

142 

143 if len(issuer_chain) < 2: 

144 raise RuntimeError("Expected at least two certificates " 

145 "in the issuer chain") 

146 

147 # Check root of trust is same as expected 

148 ic_root = issuer_chain[-1] 

149 if root_of_trust != ic_root: 

150 raise RuntimeError(f"Root of trust ({root_of_trust.subject}) does not " 

151 f"match root of TCB info issuer chain: {ic_root.subject}") 

152 

153 # Validate certification chain in root to leaf order 

154 validator = X509CertificateValidator(get_intel_pcs_x509_crl) 

155 now = datetime.now(UTC) 

156 issuer = ic_root 

157 for subject in reversed(issuer_chain[:-1]): 

158 result = validator.validate(subject, issuer, now) 

159 if not result["valid"]: 

160 raise RuntimeError("Error validating TCB info issuer " 

161 f"chain: {result["reason"]}") 

162 warnings += result["warnings"] 

163 issuer = subject 

164 

165 # Validate info signature 

166 issuer = issuer_chain[0] 

167 digest = sha256(json.dumps( 

168 info[digest_key], indent=None, separators=(",", ":")).encode()).digest() 

169 pubkey = ecdsa.VerifyingKey.from_string(issuer.public_key().public_bytes( 

170 Encoding.X962, PublicFormat.CompressedPoint), ecdsa.NIST256p) 

171 pubkey.verify_digest( 

172 bytes.fromhex(info["signature"]), 

173 digest, 

174 sigdecode=ecdsa.util.sigdecode_string 

175 ) 

176 

177 return { 

178 "info": info, 

179 "warnings": warnings, 

180 } 

181 

182 

183def get_tcb_info(url, fmspc_hex, root_of_trust, update="early"): 

184 try: 

185 final_url = f"{url}?fmspc={fmspc_hex}&update={update}" 

186 result = _get_auth_json_info( 

187 final_url, "TCB-Info-Issuer-Chain", "tcbInfo", root_of_trust) 

188 

189 return { 

190 "tcb_info": result["info"], 

191 "warnings": result["warnings"], 

192 } 

193 except Exception as e: 

194 raise RuntimeError(f"While fetching TCB info from {final_url}: {e}") 

195 

196 

197def validate_tcb_info(pck_info, tcb_info): 

198 try: 

199 matching_level = None 

200 for level in tcb_info["tcbLevels"]: 

201 found = True 

202 svns_info = [] 

203 for index, component in enumerate(level["tcb"]["sgxtcbcomponents"]): 

204 comp_id = f"comp{index+1:02}" 

205 pck_svn = pck_info["tcb"][comp_id] 

206 tcb_svn = component["svn"] 

207 if pck_svn < tcb_svn: 

208 found = False 

209 break 

210 svns_info.append(f"Comp {index+1:02}: {pck_svn} >= {tcb_svn}") 

211 

212 if not found: 

213 continue 

214 

215 pck_pcesvn = pck_info["tcb"]["pcesvn"] 

216 tcb_pcesvn = level["tcb"]["pcesvn"] 

217 if pck_pcesvn >= tcb_pcesvn: 

218 svns_info.append(f"PCESVN: {pck_pcesvn} >= {tcb_pcesvn}") 

219 matching_level = level 

220 break 

221 

222 if matching_level is None: 

223 return { 

224 "valid": False, 

225 "reason": "TCB level is unsupported" 

226 } 

227 

228 return { 

229 "valid": True, 

230 "status": matching_level["tcbStatus"], 

231 "date": matching_level["tcbDate"], 

232 "advisories": matching_level["advisoryIDs"], 

233 "svns": svns_info, 

234 "edn": tcb_info["tcbEvaluationDataNumber"], 

235 } 

236 except Exception as e: 

237 return { 

238 "valid": False, 

239 "reason": f"While validating TCB information: {e}", 

240 } 

241 

242 

243def get_qeid_info(url, root_of_trust, update="early"): 

244 try: 

245 final_url = f"{url}?update={update}" 

246 result = _get_auth_json_info( 

247 final_url, "SGX-Enclave-Identity-Issuer-Chain", 

248 "enclaveIdentity", root_of_trust) 

249 

250 return { 

251 "qeid_info": result["info"], 

252 "warnings": result["warnings"], 

253 } 

254 except Exception as e: 

255 raise RuntimeError(f"While fetching QE identity info from {final_url}: {e}") 

256 

257 

258def validate_qeid_info(report_info, qeid_info): 

259 def fail(message): 

260 return { 

261 "valid": False, 

262 "reason": message 

263 } 

264 

265 try: 

266 if report_info.mrsigner != bytes.fromhex(qeid_info["mrsigner"]): 

267 return fail("QE MRSIGNER does not match the Intel QE identity information") 

268 

269 if report_info.isvprodid != qeid_info["isvprodid"]: 

270 return fail("QE ISVPRODID does not match the Intel QE identity information") 

271 

272 masked_miscselect = report_info.miscselect & \ 

273 int.from_bytes(bytes.fromhex(qeid_info["miscselectMask"]), 

274 byteorder="little", signed=False) 

275 miscselect = int.from_bytes( 

276 bytes.fromhex(qeid_info["miscselect"]), byteorder="little", signed=False) 

277 if masked_miscselect != miscselect: 

278 return fail("QE MISCSELECT does not match the Intel QE identity information") 

279 

280 report_attributes = int.from_bytes( 

281 report_info.attributes.flags.to_bytes(8, byteorder="little") + 

282 report_info.attributes.xfrm.to_bytes(8, byteorder="little"), 

283 byteorder="little", signed=False) 

284 masked_attributes = report_attributes & \ 

285 int.from_bytes(bytes.fromhex(qeid_info["attributesMask"]), 

286 byteorder="little", signed=False) 

287 attributes = int.from_bytes( 

288 bytes.fromhex(qeid_info["attributes"]), byteorder="little", signed=False) 

289 if masked_attributes != attributes: 

290 return fail("QE ATTRIBUTES does not match the Intel QE identity information") 

291 

292 matching_level = None 

293 for level in qeid_info["tcbLevels"]: 

294 if report_info.isvsvn >= level["tcb"]["isvsvn"]: 

295 matching_level = level 

296 break 

297 

298 if matching_level is None: 

299 return fail("QE TCB level is not supported") 

300 

301 return { 

302 "valid": True, 

303 "status": matching_level["tcbStatus"], 

304 "date": matching_level["tcbDate"], 

305 "advisories": matching_level.get("advisoryIDs", []), 

306 "isvsvn": f"{report_info.isvsvn} >= {matching_level["tcb"]["isvsvn"]}", 

307 "edn": qeid_info["tcbEvaluationDataNumber"], 

308 } 

309 except Exception as e: 

310 return { 

311 "valid": False, 

312 "reason": f"While validating QE identity information: {e}", 

313 }