Coverage for admin/certificate_v1.py: 99%
158 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import hmac
25import secp256k1 as ec
26import hashlib
27from .utils import is_nonempty_hex_string
30class HSMCertificateRoot:
31 def __init__(self, raw_pubkey_hex):
32 # Parse the public key
33 try:
34 self.pubkey = ec.PublicKey(bytes.fromhex(raw_pubkey_hex), raw=True)
35 except Exception:
36 raise ValueError("Error parsing certificate root public key")
38 def __repr__(self):
39 return self.pubkey.serialize(compressed=False).hex()
41 def get_pubkey(self):
42 return self.pubkey
45class HSMCertificateElement:
46 VALID_NAMES = ["device", "attestation", "ui", "signer"]
47 EXTRACTORS = {
48 "device": lambda b: b[-65:],
49 "attestation": lambda b: b[1:],
50 "ui": lambda b: b[:],
51 "signer": lambda b: b[:],
52 }
54 def __init__(self, element_map):
55 if ("name" not in element_map
56 or element_map["name"] not in self.VALID_NAMES):
57 raise ValueError("Missing or invalid name for HSM certificate element")
58 self._name = element_map["name"]
60 if "signed_by" not in element_map:
61 raise ValueError("Missing certifier for HSM certificate element")
62 self._signed_by = element_map["signed_by"]
64 self._tweak = None
65 if "tweak" in element_map:
66 if not is_nonempty_hex_string(element_map["tweak"]):
67 raise ValueError(
68 f"Invalid signer tweak for HSM certificate element {self.name}")
69 self._tweak = element_map["tweak"]
71 if "message" not in element_map or not is_nonempty_hex_string(
72 element_map["message"]):
73 raise ValueError(
74 f"Missing or invalid message for HSM certificate element {self.name}")
75 self._message = element_map["message"]
77 if "signature" not in element_map or not is_nonempty_hex_string(
78 element_map["signature"]):
79 raise ValueError(
80 f"Missing or invalid signature for HSM certificate element {self.name}")
81 self._signature = element_map["signature"]
83 @property
84 def name(self):
85 return self._name
87 @property
88 def signed_by(self):
89 return self._signed_by
91 @property
92 def tweak(self):
93 return self._tweak
95 @property
96 def message(self):
97 return self._message
99 @property
100 def signature(self):
101 return self._signature
103 def to_dict(self):
104 result = {
105 "name": self.name,
106 "message": self.message,
107 "signature": self.signature,
108 "signed_by": self.signed_by,
109 }
111 if self.tweak is not None:
112 result["tweak"] = self.tweak
114 return result
116 def is_valid(self, certifier):
117 try:
118 message = bytes.fromhex(self.message)
120 certifier_pubkey = certifier.get_pubkey()
121 verifier_pubkey = certifier_pubkey
122 if self.tweak is not None:
123 tweak = hmac.new(
124 bytes.fromhex(self.tweak),
125 certifier_pubkey.serialize(compressed=False),
126 hashlib.sha256,
127 ).digest()
129 verifier_pubkey = verifier_pubkey.tweak_add(tweak)
131 return verifier_pubkey.ecdsa_verify(
132 message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature)))
133 except Exception:
134 return False
136 def get_value(self):
137 return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()
139 def get_pubkey(self):
140 return ec.PublicKey(bytes.fromhex(self.get_value()), raw=True)
142 def get_tweak(self):
143 return self.tweak
146class HSMCertificate:
147 VERSION = 1 # Only supported version
148 ROOT_ELEMENT = "root"
149 ELEMENT_BASE_CLASS = HSMCertificateElement
150 ELEMENT_FACTORY = HSMCertificateElement
152 @classmethod
153 def from_jsonfile(cls, path):
154 try:
155 with open(path, "r") as file:
156 certificate_map = json.loads(file.read())
158 if type(certificate_map) != dict:
159 raise ValueError(
160 "Certificate file must contain an object as a top level element")
162 version = certificate_map.get("version")
163 if version not in cls.VERSION_MAPPING:
164 raise ValueError("Invalid or unsupported HSM certificate "
165 f"version {version} (supported versions are "
166 f"{", ".join(cls.VERSION_MAPPING.keys())})")
168 return cls.VERSION_MAPPING[version](certificate_map)
169 except (ValueError, json.JSONDecodeError) as e:
170 raise ValueError('Unable to read HSM certificate from "%s": %s' %
171 (path, str(e)))
173 def __init__(self, certificate_map=None):
174 self._targets = []
175 self._elements = {}
177 if certificate_map is not None:
178 self._parse(certificate_map)
180 def validate_and_get_values(self, root_of_trust):
181 result = {}
182 for target in self._targets:
183 # Build the chain from the target to the root
184 chain = []
185 current = self._elements[target]
186 while True:
187 if current.signed_by == self.ROOT_ELEMENT:
188 break
189 chain.append(current)
190 current = self._elements[current.signed_by]
192 # Validate the chain from root to leaf
193 # If valid, return True and the value of the leaf
194 # If not valid, return False and the name of the element that
195 # failed the validation
196 current_certifier = root_of_trust
197 while True:
198 # Validate this element
199 if not current.is_valid(current_certifier):
200 result[target] = (False, current.name)
201 break
202 # Reached the leaf? => valid!
203 if len(chain) == 0:
204 result[target] = (True, current.get_value(), current.get_tweak())
205 break
207 current_certifier = current
208 current = chain.pop()
210 return result
212 def add_element(self, element):
213 if not isinstance(element, self.ELEMENT_BASE_CLASS):
214 raise ValueError(f"Expected an {self.ELEMENT_BASE_CLASS.__name__} "
215 "but got a {type(element)}")
216 self._elements[element.name] = element
218 def clear_targets(self):
219 self._targets = []
221 def add_target(self, target):
222 if target not in self._elements:
223 raise ValueError(f"Target {target} not in elements")
224 self._targets.append(target)
226 def to_dict(self):
227 return {
228 "version": self.VERSION,
229 "targets": self._targets,
230 "elements": list(map(lambda e: e.to_dict(), self._elements.values())),
231 }
233 def save_to_jsonfile(self, path):
234 with open(path, "w") as file:
235 file.write("%s\n" % json.dumps(self.to_dict(), indent=2))
237 def _parse(self, certificate_map):
238 version = certificate_map.get("version")
239 if version != self.VERSION:
240 raise ValueError("Invalid or unexpected HSM certificate version "
241 f"{version} (expected {self.VERSION})")
243 if "targets" not in certificate_map or type(certificate_map["targets"]) != list:
244 raise ValueError("Missing or invalid targets")
246 self._targets = certificate_map["targets"]
248 if "elements" not in certificate_map:
249 raise ValueError("Missing elements")
251 for item in certificate_map["elements"]:
252 element = self.ELEMENT_FACTORY(item)
253 self._elements[item["name"]] = element
255 # Sanity: check each target has a path to the root authority
256 for target in self._targets:
257 if target not in self._elements:
258 raise ValueError(f"Target {target} not in elements")
260 visited = []
261 current = self._elements[target]
262 while True:
263 if current.name in visited:
264 raise ValueError(
265 f"Target {target} has not got a path to the root authority")
266 if current.signed_by == self.ROOT_ELEMENT:
267 break
268 if current.signed_by not in self._elements:
269 raise ValueError(f"Signer {current.signed_by} not in elements")
270 visited.append(current.name)
271 current = self._elements[current.signed_by]