Coverage for admin/certificate.py: 99%
135 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import hmac
25import secp256k1 as ec
26import hashlib
27from .utils import is_nonempty_hex_string
30class HSMCertificate:
31 VERSION = 1 # Only supported version
32 ROOT_ELEMENT = "root"
34 @staticmethod
35 def from_jsonfile(path):
36 try:
37 with open(path, "r") as file:
38 certificate_map = json.loads(file.read())
40 if type(certificate_map) != dict:
41 raise ValueError(
42 "JSON file must contain an object as a top level element")
44 return HSMCertificate(certificate_map)
45 except (ValueError, json.JSONDecodeError) as e:
46 raise ValueError('Unable to read HSM certificate from "%s": %s' %
47 (path, str(e)))
49 def __init__(self, certificate_map=None):
50 self._targets = []
51 self._elements = {}
53 if certificate_map is not None:
54 self._parse(certificate_map)
56 def validate_and_get_values(self, raw_root_pubkey_hex):
57 # Parse the root public key
58 try:
59 root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True)
60 except Exception:
61 return dict([(target, (False, self.ROOT_ELEMENT))
62 for target in self._targets])
64 result = {}
65 for target in self._targets:
66 # Build the chain from the target to the root
67 chain = []
68 current = self._elements[target]
69 while True:
70 if current.signed_by == self.ROOT_ELEMENT:
71 break
72 chain.append(current)
73 current = self._elements[current.signed_by]
75 # Validate the chain from root to leaf
76 # If valid, return True and the value of the leaf
77 # If not valid, return False and the name of the element that
78 # failed the validation
79 current_pubkey = root_pubkey
80 while True:
81 # Validate this element
82 if not current.is_valid(current_pubkey):
83 result[target] = (False, current.name)
84 break
85 # Reached the leaf? => valid!
86 if len(chain) == 0:
87 result[target] = (True, current.get_value(), current.tweak)
88 break
90 current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()),
91 raw=True)
92 current = chain.pop()
94 return result
96 def add_element(self, element):
97 if type(element) != HSMCertificateElement:
98 raise ValueError(
99 f"Expected an HSMCertificateElement but got a {type(element)}")
100 self._elements[element.name] = element
102 def clear_targets(self):
103 self._targets = []
105 def add_target(self, target):
106 if target not in self._elements:
107 raise ValueError(f"Target {target} not in elements")
108 self._targets.append(target)
110 def to_dict(self):
111 return {
112 "version": self.VERSION,
113 "targets": self._targets,
114 "elements": list(map(lambda e: e.to_dict(), self._elements.values())),
115 }
117 def save_to_jsonfile(self, path):
118 with open(path, "w") as file:
119 file.write("%s\n" % json.dumps(self.to_dict(), indent=2))
121 def _parse(self, certificate_map):
122 if "version" not in certificate_map or certificate_map["version"] != self.VERSION:
123 raise ValueError(
124 "Invalid or unsupported HSM certificate version "
125 f"(current version is {self.VERSION})"
126 )
128 if "targets" not in certificate_map or type(certificate_map["targets"]) != list:
129 raise ValueError("Missing or invalid targets")
131 self._targets = certificate_map["targets"]
133 if "elements" not in certificate_map:
134 raise ValueError("Missing elements")
136 for item in certificate_map["elements"]:
137 element = HSMCertificateElement(item)
138 self._elements[item["name"]] = element
140 # Sanity: check each target has a path to the root authority
141 for target in self._targets:
142 if target not in self._elements:
143 raise ValueError(f"Target {target} not in elements")
145 visited = []
146 current = self._elements[target]
147 while True:
148 if current.name in visited:
149 raise ValueError(
150 f"Target {target} has not got a path to the root authority")
151 if current.signed_by == self.ROOT_ELEMENT:
152 break
153 if current.signed_by not in self._elements:
154 raise ValueError(f"Signer {current.signed_by} not in elements")
155 visited.append(current.name)
156 current = self._elements[current.signed_by]
159class HSMCertificateElement:
160 VALID_NAMES = ["device", "attestation", "ui", "signer"]
161 EXTRACTORS = {
162 "device": lambda b: b[-65:],
163 "attestation": lambda b: b[1:],
164 "ui": lambda b: b[:],
165 "signer": lambda b: b[:],
166 }
168 def __init__(self, element_map):
169 if ("name" not in element_map
170 or element_map["name"] not in self.VALID_NAMES):
171 raise ValueError("Missing or invalid name for HSM certificate element")
172 self._name = element_map["name"]
174 if "signed_by" not in element_map:
175 raise ValueError("Missing certifier for HSM certificate element")
176 self._signed_by = element_map["signed_by"]
178 self._tweak = None
179 if "tweak" in element_map:
180 if not is_nonempty_hex_string(element_map["tweak"]):
181 raise ValueError(
182 f"Invalid signer tweak for HSM certificate element {self.name}")
183 self._tweak = element_map["tweak"]
185 if "message" not in element_map or not is_nonempty_hex_string(
186 element_map["message"]):
187 raise ValueError(
188 f"Missing or invalid message for HSM certificate element {self.name}")
189 self._message = element_map["message"]
191 if "signature" not in element_map or not is_nonempty_hex_string(
192 element_map["signature"]):
193 raise ValueError(
194 f"Missing or invalid signature for HSM certificate element {self.name}")
195 self._signature = element_map["signature"]
197 @property
198 def name(self):
199 return self._name
201 @property
202 def signed_by(self):
203 return self._signed_by
205 @property
206 def tweak(self):
207 return self._tweak
209 @property
210 def message(self):
211 return self._message
213 @property
214 def signature(self):
215 return self._signature
217 def to_dict(self):
218 result = {
219 "name": self.name,
220 "message": self.message,
221 "signature": self.signature,
222 "signed_by": self.signed_by,
223 }
225 if self.tweak is not None:
226 result["tweak"] = self.tweak
228 return result
230 def is_valid(self, certifier_pubkey):
231 try:
232 message = bytes.fromhex(self.message)
234 verifier_pubkey = certifier_pubkey
235 if self.tweak is not None:
236 tweak = hmac.new(
237 bytes.fromhex(self.tweak),
238 certifier_pubkey.serialize(compressed=False),
239 hashlib.sha256,
240 ).digest()
242 verifier_pubkey = verifier_pubkey.tweak_add(tweak)
244 return verifier_pubkey.ecdsa_verify(
245 message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature)))
246 except Exception:
247 return False
249 def get_value(self):
250 return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()