Coverage for admin/certificate_v1.py: 99%
164 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-30 06:22 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-30 06:22 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import hmac
25import secp256k1 as ec
26import hashlib
27from .utils import is_nonempty_hex_string
30class HSMCertificateRoot:
31 def __init__(self, raw_pubkey_hex):
32 # Parse the public key
33 try:
34 self.pubkey = ec.PublicKey(bytes.fromhex(raw_pubkey_hex), raw=True)
35 except Exception:
36 raise ValueError("Error parsing certificate root public key")
38 def __repr__(self):
39 return self.pubkey.serialize(compressed=False).hex()
41 def get_pubkey(self):
42 return self.pubkey
45class HSMCertificateElement:
46 VALID_NAMES = ["device", "attestation", "ui", "signer"]
47 EXTRACTORS = {
48 "device": lambda b: b[-65:],
49 "attestation": lambda b: b[1:],
50 "ui": lambda b: b[:],
51 "signer": lambda b: b[:],
52 }
54 def __init__(self, element_map):
55 if ("name" not in element_map
56 or element_map["name"] not in self.VALID_NAMES):
57 raise ValueError("Missing or invalid name for HSM certificate element")
58 self._name = element_map["name"]
60 if "signed_by" not in element_map:
61 raise ValueError("Missing certifier for HSM certificate element")
62 self._signed_by = element_map["signed_by"]
64 self._tweak = None
65 if "tweak" in element_map:
66 if not is_nonempty_hex_string(element_map["tweak"]):
67 raise ValueError(
68 f"Invalid signer tweak for HSM certificate element {self.name}")
69 self._tweak = element_map["tweak"]
71 if "message" not in element_map or not is_nonempty_hex_string(
72 element_map["message"]):
73 raise ValueError(
74 f"Missing or invalid message for HSM certificate element {self.name}")
75 self._message = element_map["message"]
77 if "signature" not in element_map or not is_nonempty_hex_string(
78 element_map["signature"]):
79 raise ValueError(
80 f"Missing or invalid signature for HSM certificate element {self.name}")
81 self._signature = element_map["signature"]
83 @property
84 def name(self):
85 return self._name
87 @property
88 def signed_by(self):
89 return self._signed_by
91 @property
92 def tweak(self):
93 return self._tweak
95 @property
96 def message(self):
97 return self._message
99 @property
100 def signature(self):
101 return self._signature
103 def to_dict(self):
104 result = {
105 "name": self.name,
106 "message": self.message,
107 "signature": self.signature,
108 "signed_by": self.signed_by,
109 }
111 if self.tweak is not None:
112 result["tweak"] = self.tweak
114 return result
116 def is_valid(self, certifier):
117 try:
118 message = bytes.fromhex(self.message)
120 certifier_pubkey = certifier.get_pubkey()
121 verifier_pubkey = certifier_pubkey
122 if self.tweak is not None:
123 tweak = hmac.new(
124 bytes.fromhex(self.tweak),
125 certifier_pubkey.serialize(compressed=False),
126 hashlib.sha256,
127 ).digest()
129 verifier_pubkey = verifier_pubkey.tweak_add(tweak)
131 return verifier_pubkey.ecdsa_verify(
132 message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature)))
133 except Exception:
134 return False
136 def get_value(self):
137 return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()
139 def get_pubkey(self):
140 return ec.PublicKey(bytes.fromhex(self.get_value()), raw=True)
142 def get_tweak(self):
143 return self.tweak
145 def get_collateral(self):
146 return None
149class HSMCertificate:
150 VERSION = 1 # Only supported version
151 ROOT_ELEMENT = "root"
152 ELEMENT_BASE_CLASS = HSMCertificateElement
153 ELEMENT_FACTORY = HSMCertificateElement
155 @classmethod
156 def from_jsonfile(cls, path):
157 try:
158 with open(path, "r") as file:
159 certificate_map = json.loads(file.read())
161 if type(certificate_map) != dict:
162 raise ValueError(
163 "Certificate file must contain an object as a top level element")
165 version = certificate_map.get("version")
166 if version not in cls.VERSION_MAPPING:
167 raise ValueError("Invalid or unsupported HSM certificate "
168 f"version {version} (supported versions are "
169 f"{", ".join(cls.VERSION_MAPPING.keys())})")
171 return cls.VERSION_MAPPING[version](certificate_map)
172 except (ValueError, json.JSONDecodeError) as e:
173 raise ValueError('Unable to read HSM certificate from "%s": %s' %
174 (path, str(e)))
176 def __init__(self, certificate_map=None):
177 self._targets = []
178 self._elements = {}
180 if certificate_map is not None:
181 self._parse(certificate_map)
183 def validate_and_get_values(self, root_of_trust):
184 result = {}
185 for target in self._targets:
186 # Build the chain from the target to the root
187 chain = []
188 current = self._elements[target]
189 while True:
190 if current.signed_by == self.ROOT_ELEMENT:
191 break
192 chain.append(current)
193 current = self._elements[current.signed_by]
195 # Validate the chain from root to leaf
196 # If valid, return True and the value of the leaf
197 # If not valid, return False and the name of the element that
198 # failed the validation
199 current_certifier = root_of_trust
200 collateral = {}
201 while True:
202 # Validate this element
203 if not current.is_valid(current_certifier):
204 result[target] = {
205 "valid": False,
206 "failed_element": current.name,
207 }
208 break
209 # Add collateral from current element
210 current_collateral = current.get_collateral()
211 if current_collateral is not None:
212 collateral[current.name] = current_collateral
213 # Reached the leaf? => valid!
214 if len(chain) == 0:
215 result[target] = {
216 "valid": True,
217 "value": current.get_value(),
218 "tweak": current.get_tweak(),
219 "collateral": collateral,
220 }
221 break
223 current_certifier = current
224 current = chain.pop()
226 return result
228 def add_element(self, element):
229 if not isinstance(element, self.ELEMENT_BASE_CLASS):
230 raise ValueError(f"Expected an {self.ELEMENT_BASE_CLASS.__name__} "
231 "but got a {type(element)}")
232 self._elements[element.name] = element
234 def clear_targets(self):
235 self._targets = []
237 def add_target(self, target):
238 if target not in self._elements:
239 raise ValueError(f"Target {target} not in elements")
240 self._targets.append(target)
242 def to_dict(self):
243 return {
244 "version": self.VERSION,
245 "targets": self._targets,
246 "elements": list(map(lambda e: e.to_dict(), self._elements.values())),
247 }
249 def save_to_jsonfile(self, path):
250 with open(path, "w") as file:
251 file.write("%s\n" % json.dumps(self.to_dict(), indent=2))
253 def _parse(self, certificate_map):
254 version = certificate_map.get("version")
255 if version != self.VERSION:
256 raise ValueError("Invalid or unexpected HSM certificate version "
257 f"{version} (expected {self.VERSION})")
259 if "targets" not in certificate_map or type(certificate_map["targets"]) != list:
260 raise ValueError("Missing or invalid targets")
262 self._targets = certificate_map["targets"]
264 if "elements" not in certificate_map:
265 raise ValueError("Missing elements")
267 for item in certificate_map["elements"]:
268 element = self.ELEMENT_FACTORY(item)
269 self._elements[item["name"]] = element
271 # Sanity: check each target has a path to the root authority
272 for target in self._targets:
273 if target not in self._elements:
274 raise ValueError(f"Target {target} not in elements")
276 visited = []
277 current = self._elements[target]
278 while True:
279 if current.name in visited:
280 raise ValueError(
281 f"Target {target} has not got a path to the root authority")
282 if current.signed_by == self.ROOT_ELEMENT:
283 break
284 if current.signed_by not in self._elements:
285 raise ValueError(f"Signer {current.signed_by} not in elements")
286 visited.append(current.name)
287 current = self._elements[current.signed_by]