Coverage for admin/certificate_v1.py: 99%

158 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import hmac 

25import secp256k1 as ec 

26import hashlib 

27from .utils import is_nonempty_hex_string 

28 

29 

30class HSMCertificateRoot: 

31 def __init__(self, raw_pubkey_hex): 

32 # Parse the public key 

33 try: 

34 self.pubkey = ec.PublicKey(bytes.fromhex(raw_pubkey_hex), raw=True) 

35 except Exception: 

36 raise ValueError("Error parsing certificate root public key") 

37 

38 def __repr__(self): 

39 return self.pubkey.serialize(compressed=False).hex() 

40 

41 def get_pubkey(self): 

42 return self.pubkey 

43 

44 

45class HSMCertificateElement: 

46 VALID_NAMES = ["device", "attestation", "ui", "signer"] 

47 EXTRACTORS = { 

48 "device": lambda b: b[-65:], 

49 "attestation": lambda b: b[1:], 

50 "ui": lambda b: b[:], 

51 "signer": lambda b: b[:], 

52 } 

53 

54 def __init__(self, element_map): 

55 if ("name" not in element_map 

56 or element_map["name"] not in self.VALID_NAMES): 

57 raise ValueError("Missing or invalid name for HSM certificate element") 

58 self._name = element_map["name"] 

59 

60 if "signed_by" not in element_map: 

61 raise ValueError("Missing certifier for HSM certificate element") 

62 self._signed_by = element_map["signed_by"] 

63 

64 self._tweak = None 

65 if "tweak" in element_map: 

66 if not is_nonempty_hex_string(element_map["tweak"]): 

67 raise ValueError( 

68 f"Invalid signer tweak for HSM certificate element {self.name}") 

69 self._tweak = element_map["tweak"] 

70 

71 if "message" not in element_map or not is_nonempty_hex_string( 

72 element_map["message"]): 

73 raise ValueError( 

74 f"Missing or invalid message for HSM certificate element {self.name}") 

75 self._message = element_map["message"] 

76 

77 if "signature" not in element_map or not is_nonempty_hex_string( 

78 element_map["signature"]): 

79 raise ValueError( 

80 f"Missing or invalid signature for HSM certificate element {self.name}") 

81 self._signature = element_map["signature"] 

82 

83 @property 

84 def name(self): 

85 return self._name 

86 

87 @property 

88 def signed_by(self): 

89 return self._signed_by 

90 

91 @property 

92 def tweak(self): 

93 return self._tweak 

94 

95 @property 

96 def message(self): 

97 return self._message 

98 

99 @property 

100 def signature(self): 

101 return self._signature 

102 

103 def to_dict(self): 

104 result = { 

105 "name": self.name, 

106 "message": self.message, 

107 "signature": self.signature, 

108 "signed_by": self.signed_by, 

109 } 

110 

111 if self.tweak is not None: 

112 result["tweak"] = self.tweak 

113 

114 return result 

115 

116 def is_valid(self, certifier): 

117 try: 

118 message = bytes.fromhex(self.message) 

119 

120 certifier_pubkey = certifier.get_pubkey() 

121 verifier_pubkey = certifier_pubkey 

122 if self.tweak is not None: 

123 tweak = hmac.new( 

124 bytes.fromhex(self.tweak), 

125 certifier_pubkey.serialize(compressed=False), 

126 hashlib.sha256, 

127 ).digest() 

128 

129 verifier_pubkey = verifier_pubkey.tweak_add(tweak) 

130 

131 return verifier_pubkey.ecdsa_verify( 

132 message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature))) 

133 except Exception: 

134 return False 

135 

136 def get_value(self): 

137 return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex() 

138 

139 def get_pubkey(self): 

140 return ec.PublicKey(bytes.fromhex(self.get_value()), raw=True) 

141 

142 def get_tweak(self): 

143 return self.tweak 

144 

145 

146class HSMCertificate: 

147 VERSION = 1 # Only supported version 

148 ROOT_ELEMENT = "root" 

149 ELEMENT_BASE_CLASS = HSMCertificateElement 

150 ELEMENT_FACTORY = HSMCertificateElement 

151 

152 @classmethod 

153 def from_jsonfile(cls, path): 

154 try: 

155 with open(path, "r") as file: 

156 certificate_map = json.loads(file.read()) 

157 

158 if type(certificate_map) != dict: 

159 raise ValueError( 

160 "Certificate file must contain an object as a top level element") 

161 

162 version = certificate_map.get("version") 

163 if version not in cls.VERSION_MAPPING: 

164 raise ValueError("Invalid or unsupported HSM certificate " 

165 f"version {version} (supported versions are " 

166 f"{", ".join(cls.VERSION_MAPPING.keys())})") 

167 

168 return cls.VERSION_MAPPING[version](certificate_map) 

169 except (ValueError, json.JSONDecodeError) as e: 

170 raise ValueError('Unable to read HSM certificate from "%s": %s' % 

171 (path, str(e))) 

172 

173 def __init__(self, certificate_map=None): 

174 self._targets = [] 

175 self._elements = {} 

176 

177 if certificate_map is not None: 

178 self._parse(certificate_map) 

179 

180 def validate_and_get_values(self, root_of_trust): 

181 result = {} 

182 for target in self._targets: 

183 # Build the chain from the target to the root 

184 chain = [] 

185 current = self._elements[target] 

186 while True: 

187 if current.signed_by == self.ROOT_ELEMENT: 

188 break 

189 chain.append(current) 

190 current = self._elements[current.signed_by] 

191 

192 # Validate the chain from root to leaf 

193 # If valid, return True and the value of the leaf 

194 # If not valid, return False and the name of the element that 

195 # failed the validation 

196 current_certifier = root_of_trust 

197 while True: 

198 # Validate this element 

199 if not current.is_valid(current_certifier): 

200 result[target] = (False, current.name) 

201 break 

202 # Reached the leaf? => valid! 

203 if len(chain) == 0: 

204 result[target] = (True, current.get_value(), current.get_tweak()) 

205 break 

206 

207 current_certifier = current 

208 current = chain.pop() 

209 

210 return result 

211 

212 def add_element(self, element): 

213 if not isinstance(element, self.ELEMENT_BASE_CLASS): 

214 raise ValueError(f"Expected an {self.ELEMENT_BASE_CLASS.__name__} " 

215 "but got a {type(element)}") 

216 self._elements[element.name] = element 

217 

218 def clear_targets(self): 

219 self._targets = [] 

220 

221 def add_target(self, target): 

222 if target not in self._elements: 

223 raise ValueError(f"Target {target} not in elements") 

224 self._targets.append(target) 

225 

226 def to_dict(self): 

227 return { 

228 "version": self.VERSION, 

229 "targets": self._targets, 

230 "elements": list(map(lambda e: e.to_dict(), self._elements.values())), 

231 } 

232 

233 def save_to_jsonfile(self, path): 

234 with open(path, "w") as file: 

235 file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) 

236 

237 def _parse(self, certificate_map): 

238 version = certificate_map.get("version") 

239 if version != self.VERSION: 

240 raise ValueError("Invalid or unexpected HSM certificate version " 

241 f"{version} (expected {self.VERSION})") 

242 

243 if "targets" not in certificate_map or type(certificate_map["targets"]) != list: 

244 raise ValueError("Missing or invalid targets") 

245 

246 self._targets = certificate_map["targets"] 

247 

248 if "elements" not in certificate_map: 

249 raise ValueError("Missing elements") 

250 

251 for item in certificate_map["elements"]: 

252 element = self.ELEMENT_FACTORY(item) 

253 self._elements[item["name"]] = element 

254 

255 # Sanity: check each target has a path to the root authority 

256 for target in self._targets: 

257 if target not in self._elements: 

258 raise ValueError(f"Target {target} not in elements") 

259 

260 visited = [] 

261 current = self._elements[target] 

262 while True: 

263 if current.name in visited: 

264 raise ValueError( 

265 f"Target {target} has not got a path to the root authority") 

266 if current.signed_by == self.ROOT_ELEMENT: 

267 break 

268 if current.signed_by not in self._elements: 

269 raise ValueError(f"Signer {current.signed_by} not in elements") 

270 visited.append(current.name) 

271 current = self._elements[current.signed_by]