Coverage for admin/certificate_v2.py: 98%

173 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import re 

24import base64 

25import ecdsa 

26import hashlib 

27from datetime import datetime, UTC 

28from pathlib import Path 

29from cryptography import x509 

30from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding 

31from cryptography.hazmat.primitives.asymmetric import ec 

32from .certificate_v1 import HSMCertificate 

33from .utils import is_nonempty_hex_string 

34from sgx.envelope import SgxQuote, SgxReportBody 

35 

36 

37class HSMCertificateV2Element: 

38 def __init__(self): 

39 raise NotImplementedError("Cannot instantiate a HSMCertificateV2Element") 

40 

41 @classmethod 

42 def from_dict(cls, element_map): 

43 if element_map.get("type") not in cls.TYPE_MAPPING: 

44 raise ValueError("Invalid or missing element type for " 

45 f"element {element_map.get('name')}") 

46 

47 return cls.TYPE_MAPPING[element_map["type"]](element_map) 

48 

49 def _init_with_map(self, element_map): 

50 if "name" not in element_map: 

51 raise ValueError("Missing name for HSM certificate element") 

52 

53 self._name = element_map["name"] 

54 

55 if "signed_by" not in element_map: 

56 raise ValueError("Missing certifier for HSM certificate element") 

57 self._signed_by = element_map["signed_by"] 

58 

59 @property 

60 def name(self): 

61 return self._name 

62 

63 @property 

64 def signed_by(self): 

65 return self._signed_by 

66 

67 def get_value(self): 

68 raise NotImplementedError(f"{type(self).__name__} can't provide a value") 

69 

70 def get_pubkey(self): 

71 raise NotImplementedError(f"{type(self).__name__} can't provide a public key") 

72 

73 def is_valid(self, certifier): 

74 raise NotImplementedError(f"{type(self).__name__} can't be queried for validity") 

75 

76 def get_tweak(self): 

77 return None 

78 

79 

80class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element): 

81 def __init__(self, element_map): 

82 self._init_with_map(element_map) 

83 

84 def _init_with_map(self, element_map): 

85 super()._init_with_map(element_map) 

86 

87 if not is_nonempty_hex_string(element_map.get("message")): 

88 raise ValueError(f"Invalid message for HSM certificate element {self.name}") 

89 self._message = bytes.fromhex(element_map["message"]) 

90 

91 if not is_nonempty_hex_string(element_map.get("custom_data")): 

92 raise ValueError("Invalid custom data for HSM certificate " 

93 f"element {self.name}") 

94 self._custom_data = bytes.fromhex(element_map["custom_data"]) 

95 

96 if not is_nonempty_hex_string(element_map.get("signature")): 

97 raise ValueError("Invalid signature for HSM certificate element {self.name}") 

98 self._signature = bytes.fromhex(element_map["signature"]) 

99 

100 @property 

101 def message(self): 

102 return SgxQuote(self._message) 

103 

104 @property 

105 def custom_data(self): 

106 return self._custom_data.hex() 

107 

108 @property 

109 def signature(self): 

110 return self._signature.hex() 

111 

112 def is_valid(self, certifier): 

113 try: 

114 # Validate custom data 

115 expected = hashlib.sha256(self._custom_data).digest() 

116 if expected != self.message.report_body.report_data.field[:len(expected)]: 

117 return False 

118 

119 # Verify signature against the certifier 

120 return certifier.get_pubkey().verify_digest( 

121 self._signature, 

122 hashlib.sha256(self._message).digest(), 

123 ecdsa.util.sigdecode_der, 

124 ) 

125 except Exception: 

126 return False 

127 

128 def get_value(self): 

129 return { 

130 "sgx_quote": self.message, 

131 "message": self.custom_data, 

132 } 

133 

134 def to_dict(self): 

135 return { 

136 "name": self.name, 

137 "type": "sgx_quote", 

138 "message": self._message.hex(), 

139 "custom_data": self.custom_data, 

140 "signature": self.signature, 

141 "signed_by": self.signed_by, 

142 } 

143 

144 

145class HSMCertificateV2ElementSGXAttestationKey(HSMCertificateV2Element): 

146 def __init__(self, element_map): 

147 self._init_with_map(element_map) 

148 

149 def _init_with_map(self, element_map): 

150 super()._init_with_map(element_map) 

151 

152 if not is_nonempty_hex_string(element_map.get("message")): 

153 raise ValueError(f"Invalid message for HSM certificate element {self.name}") 

154 self._message = bytes.fromhex(element_map["message"]) 

155 

156 if not is_nonempty_hex_string(element_map.get("key")): 

157 raise ValueError(f"Invalid key for HSM certificate element {self.name}") 

158 self._key = bytes.fromhex(element_map["key"]) 

159 

160 if not is_nonempty_hex_string(element_map.get("auth_data")): 

161 raise ValueError(f"Invalid auth data for HSM certificate element {self.name}") 

162 self._auth_data = bytes.fromhex(element_map["auth_data"]) 

163 

164 if not is_nonempty_hex_string(element_map.get("signature")): 

165 raise ValueError(f"Invalid signature for HSM certificate element {self.name}") 

166 self._signature = bytes.fromhex(element_map["signature"]) 

167 

168 @property 

169 def message(self): 

170 return SgxReportBody(self._message) 

171 

172 @property 

173 def key(self): 

174 return ecdsa.VerifyingKey.from_string(self._key, ecdsa.NIST256p) 

175 

176 @property 

177 def auth_data(self): 

178 return self._auth_data.hex() 

179 

180 @property 

181 def signature(self): 

182 return self._signature.hex() 

183 

184 def is_valid(self, certifier): 

185 try: 

186 # Validate report data 

187 expected = hashlib.sha256(self.key.to_string() + self._auth_data).digest() 

188 if expected != self.message.report_data.field[:len(expected)]: 

189 return False 

190 

191 # Verify signature against the certifier 

192 return certifier.get_pubkey().verify_digest( 

193 self._signature, 

194 hashlib.sha256(self._message).digest(), 

195 ecdsa.util.sigdecode_der, 

196 ) 

197 except Exception: 

198 return False 

199 

200 def get_pubkey(self): 

201 return ecdsa.VerifyingKey.from_string(self._key, ecdsa.NIST256p) 

202 

203 def to_dict(self): 

204 return { 

205 "name": self.name, 

206 "type": "sgx_attestation_key", 

207 "message": self.message.get_raw_data().hex(), 

208 "key": self.key.to_string("uncompressed").hex(), 

209 "auth_data": self.auth_data, 

210 "signature": self.signature, 

211 "signed_by": self.signed_by, 

212 } 

213 

214 

215class HSMCertificateV2ElementX509(HSMCertificateV2Element): 

216 HEADER_BEGIN = "-----BEGIN CERTIFICATE-----" 

217 HEADER_END = "-----END CERTIFICATE-----" 

218 

219 @classmethod 

220 def from_pemfile(cls, pem_path, name, signed_by): 

221 return cls.from_pem(Path(pem_path).read_text(), name, signed_by) 

222 

223 @classmethod 

224 def from_pem(cls, pem_str, name, signed_by): 

225 return cls({ 

226 "name": name, 

227 "message": re.sub(r"[\s\n\r]+", " ", pem_str) 

228 .replace(cls.HEADER_END, "") 

229 .replace(cls.HEADER_BEGIN, "") 

230 .strip().encode(), 

231 "signed_by": signed_by, 

232 }) 

233 

234 def __init__(self, element_map): 

235 self._init_with_map(element_map) 

236 self._certificate = None 

237 

238 def _init_with_map(self, element_map): 

239 super()._init_with_map(element_map) 

240 

241 try: 

242 self._message = base64.b64decode(element_map.get("message")) 

243 except Exception: 

244 raise ValueError(f"Invalid message for HSM certificate element {self.name}") 

245 

246 @property 

247 def message(self): 

248 return base64.b64encode(self._message).decode("ASCII") 

249 

250 @property 

251 def certificate(self): 

252 if self._certificate is None: 

253 self._certificate = x509.load_pem_x509_certificate(( 

254 self.HEADER_BEGIN + self.message + self.HEADER_END).encode()) 

255 return self._certificate 

256 

257 def is_valid(self, certifier): 

258 try: 

259 # IMPORTANT: for now, we only allow verifying the validity of an 

260 # HSMCertificateV2ElementX509 using another HSMCertificateV2ElementX509 

261 # instance as certifier. That way, we simplify the validation procedure 

262 # and ensure maximum use of the underlying library's capabilities 

263 # (cryptography) 

264 if not isinstance(certifier, type(self)): 

265 return False 

266 

267 subject = self.certificate 

268 issuer = certifier.certificate 

269 now = datetime.now(UTC) 

270 

271 # 1. Check validity period 

272 if subject.not_valid_before_utc > now or subject.not_valid_after_utc < now: 

273 return False 

274 

275 # 2. Verify the signature 

276 issuer.public_key().verify( 

277 subject.signature, 

278 subject.tbs_certificate_bytes, 

279 ec.ECDSA(subject.signature_hash_algorithm) 

280 ) 

281 

282 return True 

283 

284 except Exception: 

285 return False 

286 

287 def get_pubkey(self): 

288 try: 

289 public_key = self.certificate.public_key() 

290 

291 if not isinstance(public_key.curve, ec.SECP256R1): 

292 raise ValueError("Certificate does not have a NIST P-256 public key") 

293 

294 public_bytes = public_key.public_bytes( 

295 Encoding.X962, PublicFormat.CompressedPoint) 

296 

297 return ecdsa.VerifyingKey.from_string(public_bytes, ecdsa.NIST256p) 

298 except Exception as e: 

299 raise ValueError(f"Error gathering public key from certificate: {str(e)}") 

300 

301 def to_dict(self): 

302 return { 

303 "name": self.name, 

304 "type": "x509_pem", 

305 "message": self.message, 

306 "signed_by": self.signed_by, 

307 } 

308 

309 

310# Element type mappings 

311HSMCertificateV2Element.TYPE_MAPPING = { 

312 "sgx_quote": HSMCertificateV2ElementSGXQuote, 

313 "sgx_attestation_key": HSMCertificateV2ElementSGXAttestationKey, 

314 "x509_pem": HSMCertificateV2ElementX509, 

315} 

316 

317 

318class HSMCertificateV2(HSMCertificate): 

319 VERSION = 2 

320 ROOT_ELEMENT = "sgx_root" 

321 ELEMENT_BASE_CLASS = HSMCertificateV2Element 

322 ELEMENT_FACTORY = HSMCertificateV2Element.from_dict