Coverage for admin/certificate.py: 99%

135 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-05 20:41 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import hmac 

25import secp256k1 as ec 

26import hashlib 

27from .utils import is_nonempty_hex_string 

28 

29 

30class HSMCertificate: 

31 VERSION = 1 # Only supported version 

32 ROOT_ELEMENT = "root" 

33 

34 @staticmethod 

35 def from_jsonfile(path): 

36 try: 

37 with open(path, "r") as file: 

38 certificate_map = json.loads(file.read()) 

39 

40 if type(certificate_map) != dict: 

41 raise ValueError( 

42 "JSON file must contain an object as a top level element") 

43 

44 return HSMCertificate(certificate_map) 

45 except (ValueError, json.JSONDecodeError) as e: 

46 raise ValueError('Unable to read HSM certificate from "%s": %s' % 

47 (path, str(e))) 

48 

49 def __init__(self, certificate_map=None): 

50 self._targets = [] 

51 self._elements = {} 

52 

53 if certificate_map is not None: 

54 self._parse(certificate_map) 

55 

56 def validate_and_get_values(self, raw_root_pubkey_hex): 

57 # Parse the root public key 

58 try: 

59 root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True) 

60 except Exception: 

61 return dict([(target, (False, self.ROOT_ELEMENT)) 

62 for target in self._targets]) 

63 

64 result = {} 

65 for target in self._targets: 

66 # Build the chain from the target to the root 

67 chain = [] 

68 current = self._elements[target] 

69 while True: 

70 if current.signed_by == self.ROOT_ELEMENT: 

71 break 

72 chain.append(current) 

73 current = self._elements[current.signed_by] 

74 

75 # Validate the chain from root to leaf 

76 # If valid, return True and the value of the leaf 

77 # If not valid, return False and the name of the element that 

78 # failed the validation 

79 current_pubkey = root_pubkey 

80 while True: 

81 # Validate this element 

82 if not current.is_valid(current_pubkey): 

83 result[target] = (False, current.name) 

84 break 

85 # Reached the leaf? => valid! 

86 if len(chain) == 0: 

87 result[target] = (True, current.get_value(), current.tweak) 

88 break 

89 

90 current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()), 

91 raw=True) 

92 current = chain.pop() 

93 

94 return result 

95 

96 def add_element(self, element): 

97 if type(element) != HSMCertificateElement: 

98 raise ValueError( 

99 f"Expected an HSMCertificateElement but got a {type(element)}") 

100 self._elements[element.name] = element 

101 

102 def clear_targets(self): 

103 self._targets = [] 

104 

105 def add_target(self, target): 

106 if target not in self._elements: 

107 raise ValueError(f"Target {target} not in elements") 

108 self._targets.append(target) 

109 

110 def to_dict(self): 

111 return { 

112 "version": self.VERSION, 

113 "targets": self._targets, 

114 "elements": list(map(lambda e: e.to_dict(), self._elements.values())), 

115 } 

116 

117 def save_to_jsonfile(self, path): 

118 with open(path, "w") as file: 

119 file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) 

120 

121 def _parse(self, certificate_map): 

122 if "version" not in certificate_map or certificate_map["version"] != self.VERSION: 

123 raise ValueError( 

124 "Invalid or unsupported HSM certificate version " 

125 f"(current version is {self.VERSION})" 

126 ) 

127 

128 if "targets" not in certificate_map or type(certificate_map["targets"]) != list: 

129 raise ValueError("Missing or invalid targets") 

130 

131 self._targets = certificate_map["targets"] 

132 

133 if "elements" not in certificate_map: 

134 raise ValueError("Missing elements") 

135 

136 for item in certificate_map["elements"]: 

137 element = HSMCertificateElement(item) 

138 self._elements[item["name"]] = element 

139 

140 # Sanity: check each target has a path to the root authority 

141 for target in self._targets: 

142 if target not in self._elements: 

143 raise ValueError(f"Target {target} not in elements") 

144 

145 visited = [] 

146 current = self._elements[target] 

147 while True: 

148 if current.name in visited: 

149 raise ValueError( 

150 f"Target {target} has not got a path to the root authority") 

151 if current.signed_by == self.ROOT_ELEMENT: 

152 break 

153 if current.signed_by not in self._elements: 

154 raise ValueError(f"Signer {current.signed_by} not in elements") 

155 visited.append(current.name) 

156 current = self._elements[current.signed_by] 

157 

158 

159class HSMCertificateElement: 

160 VALID_NAMES = ["device", "attestation", "ui", "signer"] 

161 EXTRACTORS = { 

162 "device": lambda b: b[-65:], 

163 "attestation": lambda b: b[1:], 

164 "ui": lambda b: b[:], 

165 "signer": lambda b: b[:], 

166 } 

167 

168 def __init__(self, element_map): 

169 if ("name" not in element_map 

170 or element_map["name"] not in self.VALID_NAMES): 

171 raise ValueError("Missing or invalid name for HSM certificate element") 

172 self._name = element_map["name"] 

173 

174 if "signed_by" not in element_map: 

175 raise ValueError("Missing certifier for HSM certificate element") 

176 self._signed_by = element_map["signed_by"] 

177 

178 self._tweak = None 

179 if "tweak" in element_map: 

180 if not is_nonempty_hex_string(element_map["tweak"]): 

181 raise ValueError( 

182 f"Invalid signer tweak for HSM certificate element {self.name}") 

183 self._tweak = element_map["tweak"] 

184 

185 if "message" not in element_map or not is_nonempty_hex_string( 

186 element_map["message"]): 

187 raise ValueError( 

188 f"Missing or invalid message for HSM certificate element {self.name}") 

189 self._message = element_map["message"] 

190 

191 if "signature" not in element_map or not is_nonempty_hex_string( 

192 element_map["signature"]): 

193 raise ValueError( 

194 f"Missing or invalid signature for HSM certificate element {self.name}") 

195 self._signature = element_map["signature"] 

196 

197 @property 

198 def name(self): 

199 return self._name 

200 

201 @property 

202 def signed_by(self): 

203 return self._signed_by 

204 

205 @property 

206 def tweak(self): 

207 return self._tweak 

208 

209 @property 

210 def message(self): 

211 return self._message 

212 

213 @property 

214 def signature(self): 

215 return self._signature 

216 

217 def to_dict(self): 

218 result = { 

219 "name": self.name, 

220 "message": self.message, 

221 "signature": self.signature, 

222 "signed_by": self.signed_by, 

223 } 

224 

225 if self.tweak is not None: 

226 result["tweak"] = self.tweak 

227 

228 return result 

229 

230 def is_valid(self, certifier_pubkey): 

231 try: 

232 message = bytes.fromhex(self.message) 

233 

234 verifier_pubkey = certifier_pubkey 

235 if self.tweak is not None: 

236 tweak = hmac.new( 

237 bytes.fromhex(self.tweak), 

238 certifier_pubkey.serialize(compressed=False), 

239 hashlib.sha256, 

240 ).digest() 

241 

242 verifier_pubkey = verifier_pubkey.tweak_add(tweak) 

243 

244 return verifier_pubkey.ecdsa_verify( 

245 message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature))) 

246 except Exception: 

247 return False 

248 

249 def get_value(self): 

250 return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()