Coverage for admin/attestation_utils.py: 100%
66 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import hashlib
24import json
25import re
26import secp256k1 as ec
27import requests
28from pathlib import Path
29from comm.cstruct import CStruct
30from .misc import AdminError
31from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementX509
34class PowHsmAttestationMessage(CStruct):
35 """
36 pow_hsm_message_header
38 uint8_t platform 3
39 uint8_t ud_value 32
40 uint8_t public_keys_hash 32
41 uint8_t best_block 32
42 uint8_t last_signed_tx 8
43 uint8_t timestamp 8
44 """
46 HEADER_REGEX = re.compile(b"^POWHSM:(5.[0-9])::")
48 @classmethod
49 def is_header(cls, value):
50 return cls.HEADER_REGEX.match(value) is not None
52 def __init__(self, value, offset=0, little=True, name="powHSM"):
53 self.name = name
54 # Parse header
55 match = self.HEADER_REGEX.match(value)
56 if match is None:
57 raise ValueError(
58 f"Invalid {self.name} attestation message header: {value.hex()}")
60 # Validate total length
61 header_length = len(match.group(0))
62 expected_length = header_length + self.get_bytelength()
63 if len(value[offset:]) != expected_length:
64 raise ValueError(f"{self.name} attestation message length "
65 f"mismatch: {value[offset:].hex()}")
67 # Grab version
68 self.version = match.group(1).decode("ASCII")
70 # Parse the rest
71 super().__init__(value, offset+header_length, little)
73 # Conversions
74 self.platform = self.platform.decode("ASCII")
75 self.timestamp = int.from_bytes(self.timestamp, byteorder="big", signed=False)
78def load_pubkeys(pubkeys_file_path):
79 # Load the given public keys file into a map
80 try:
81 with open(pubkeys_file_path, "r") as file:
82 pubkeys_map = json.loads(file.read())
84 if type(pubkeys_map) != dict:
85 raise AdminError(
86 "Public keys file must contain an object as a top level element")
88 result = {}
89 for path in pubkeys_map.keys():
90 pubkey = pubkeys_map[path]
91 try:
92 pubkey = ec.PublicKey(bytes.fromhex(pubkey), raw=True)
93 except Exception:
94 raise AdminError(f"Invalid public key for path {path}: {pubkey}")
95 result[path] = pubkey
96 return result
97 except (FileNotFoundError, ValueError, json.JSONDecodeError) as e:
98 raise AdminError('Unable to read public keys from "%s": %s' %
99 (pubkeys_file_path, str(e)))
102def compute_pubkeys_hash(pubkeys_map):
103 # Compute the given public keys hash
104 # (sha256sum of the uncompressed public keys in
105 # lexicographical path order)
106 if len(pubkeys_map) == 0:
107 raise AdminError("Can't compute the hash of an empty public keys map")
109 pubkeys_hash = hashlib.sha256()
110 for path in sorted(pubkeys_map.keys()):
111 pubkey = pubkeys_map[path]
112 pubkeys_hash.update(pubkey.serialize(compressed=False))
113 return pubkeys_hash.digest()
116def compute_pubkeys_output(pubkeys_map):
117 pubkeys_output = []
118 path_name_padding = max(map(len, pubkeys_map.keys()))
119 for path in sorted(pubkeys_map.keys()):
120 pubkey = pubkeys_map[path]
121 pubkeys_output.append(
122 f"{(path + ':').ljust(path_name_padding+1)} "
123 f"{pubkey.serialize(compressed=True).hex()}"
124 )
125 return pubkeys_output
128def get_root_of_trust(path):
129 # From file
130 if Path(path).is_file():
131 return HSMCertificateV2ElementX509.from_pemfile(
132 path,
133 HSMCertificateV2.ROOT_ELEMENT,
134 HSMCertificateV2.ROOT_ELEMENT)
136 # Assume URL and try to grab it
137 ra_res = requests.get(path)
138 if ra_res.status_code != 200:
139 raise RuntimeError(f"Error fetching root of trust from {path}")
140 return HSMCertificateV2ElementX509.from_pem(
141 ra_res.content.decode(),
142 HSMCertificateV2.ROOT_ELEMENT,
143 HSMCertificateV2.ROOT_ELEMENT)