Coverage for sgx/hsm2dongle.py: 100%
89 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23from enum import IntEnum
24from ledger.hsm2dongle import HSM2DongleError
25from ledger.hsm2dongle_tcp import HSM2DongleTCP
28class SgxCommand(IntEnum):
29 SGX_ONBOARD = 0xA0,
30 SGX_RETRIES = 0xA2,
31 SGX_UNLOCK = 0xA3,
32 SGX_ECHO = 0xA4,
33 SGX_CHANGE_PASSWORD = 0xA5,
34 SGX_UPGRADE = 0xA6,
37class SgxUpgradeOps(IntEnum):
38 START = 0x01,
39 SPEC_SIG = 0x02,
40 IDENTIFY_SELF = 0x03,
41 IDENTIFY_PEER = 0x04,
42 PROCESS_DATA = 0x05,
45class SgxUpgradeRoles(IntEnum):
46 EXPORTER = 0x01,
47 IMPORTER = 0x02,
50EVIDENCE_LEN_BYTES = 2
51EVIDENCE_CHUNK_SIZE = 80
54class HSM2DongleSGX(HSM2DongleTCP):
55 # Echo message
56 def echo(self):
57 message = bytes([0x41, 0x42, 0x43])
58 result = bytes(self._send_command(SgxCommand.SGX_ECHO, message))
59 # Result should be the command plus the message
60 expected_result = bytes([self.CLA, SgxCommand.SGX_ECHO]) + message
61 return result == expected_result
63 # Unlock the device with the given pin
64 def unlock(self, pin):
65 response = self._send_command(SgxCommand.SGX_UNLOCK, bytes([0]) + pin)
67 # Nonzero indicates device unlocked
68 return response[2] != 0
70 # change pin
71 def new_pin(self, pin):
72 response = self._send_command(SgxCommand.SGX_CHANGE_PASSWORD, bytes([0]) + pin)
74 # One indicates pin changed
75 return response[2] == 1
77 # returns the number of pin retries available
78 def get_retries(self):
79 apdu_rcv = self._send_command(SgxCommand.SGX_RETRIES)
80 return apdu_rcv[2]
82 # Attempt to onboard the device using the given seed and pin
83 def onboard(self, seed, pin):
84 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH:
85 raise HSM2DongleError("Invalid seed given")
87 if type(pin) != bytes:
88 raise HSM2DongleError("Invalid pin given")
90 self.logger.info("Sending onboard command")
91 response = self._send_command(SgxCommand.SGX_ONBOARD, bytes([0x0]) + seed + pin)
93 if response[2] != 1:
94 raise HSM2DongleError("Error onboarding. Got '%s'" % response.hex())
96 return True
98 # Migration operations
99 def migrate_db_spec(self, role, source_mre, destination_mre, signatures):
100 # Send spec and role
101 self._send_command(
102 SgxCommand.SGX_UPGRADE,
103 bytes([SgxUpgradeOps.START]) +
104 bytes([role]) + source_mre + destination_mre)
106 # Send signatures
107 for signature in signatures:
108 response = self._send_command(
109 SgxCommand.SGX_UPGRADE,
110 bytes([SgxUpgradeOps.SPEC_SIG]) + signature)
112 if response[2] == 0:
113 break
115 if response[2] != 0:
116 raise HSM2DongleError("Not enough correct signatures gathered")
118 def migrate_db_get_evidence(self):
119 evidence = bytes([])
120 while True:
121 response = self._send_command(
122 SgxCommand.SGX_UPGRADE,
123 bytes([SgxUpgradeOps.IDENTIFY_SELF]))
125 evidence += response[3:]
127 if response[2] == 0:
128 break
130 return evidence
132 def migrate_db_send_evidence(self, evidence):
133 evlen = len(evidence).to_bytes(
134 EVIDENCE_LEN_BYTES, byteorder="big", signed=False)
135 offset = 0
136 while True:
137 response = self._send_command(
138 SgxCommand.SGX_UPGRADE,
139 bytes([SgxUpgradeOps.IDENTIFY_PEER]) +
140 (evlen if offset == 0 else bytes([])) +
141 evidence[offset:offset+EVIDENCE_CHUNK_SIZE])
142 offset += EVIDENCE_CHUNK_SIZE
143 if response[2] == 0 or offset >= len(evidence):
144 break
146 if response[2] != 0:
147 raise HSM2DongleError("Failed to receive evidence ack")
149 def migrate_db_get_data(self):
150 data = self._send_command(
151 SgxCommand.SGX_UPGRADE,
152 bytes([SgxUpgradeOps.PROCESS_DATA]))[3:]
154 if len(data) == 0:
155 raise HSM2DongleError("Migration data gathering failed."
156 "Expected data but got 0 bytes instead")
158 return data
160 def migrate_db_send_data(self, data):
161 self._send_command(
162 SgxCommand.SGX_UPGRADE,
163 bytes([SgxUpgradeOps.PROCESS_DATA]) + data)
165 # Map from standard commands to SGX-specific commands
166 SGX_SPECIFIC_COMMANDS = [
167 HSM2DongleTCP.CMD.ADVANCE, HSM2DongleTCP.CMD.UPD_ANCESTOR
168 ]
170 # Send a specific piece of data in chunks to the device
171 # as the device requests bytes from it.
172 # Validate responses wrt current operation and next possible expected operations
173 # Exceptions are to be handled by the caller
174 def _send_data_in_chunks(
175 self,
176 command,
177 operation,
178 next_operations,
179 data,
180 expect_full_data,
181 initial_bytes,
182 operation_name,
183 data_description,
184 ):
185 # Same old behavior for anything that hasn't got an SGX-specific
186 # mapping
187 if command not in self.SGX_SPECIFIC_COMMANDS:
188 return super()._send_data_in_chunks(
189 command,
190 operation,
191 next_operations,
192 data, expect_full_data,
193 initial_bytes,
194 operation_name,
195 data_description)
197 # Send data in full
198 response = self._send_command(command, bytes([operation]) + data)
200 # We expect the device to ask for one of the next operations but
201 # not the current chunk operation
202 # If it doesn't happen, error out
203 if response[self.OFF.OP] not in next_operations or \
204 response[self.OFF.OP] == operation:
205 self.logger.debug(
206 "Current operation %s, next operations %s, ledger requesting %s",
207 hex(operation),
208 str(list(map(hex, next_operations))),
209 hex(response[2]),
210 )
211 self.logger.error(
212 "%s: unexpected response %s",
213 operation_name.capitalize(),
214 response.hex(),
215 )
216 return (False, response)
218 # All is good
219 return (True, response)