Coverage for tests/sgx/hsm2dongle_cmds/test_hsm2dongle_update_ancestor.py: 96%
107 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23from unittest.mock import Mock, patch, call
24from parameterized import parameterized
25from ..test_hsm2dongle import TestHSM2DongleBase, HSM2DongleTestMode
26from ledger.hsm2dongle import HSM2DongleError
27from ledgerblue.commException import CommException
29import logging
31logging.disable(logging.CRITICAL)
34class TestHSM2DongleSGXUpdateAncestor(TestHSM2DongleBase):
35 def get_test_mode(self):
36 return HSM2DongleTestMode.SGX
38 def spec_to_exchange(self, spec, trim=False):
39 # Chunk size is ignored when requesting block data, full bh is always sent
41 # Block header
42 exchanges = [
43 bytes([0, 0, 0x03]),
44 bytes([0, 0, 0x04, 0xAA]),
45 ]
47 # Spec has brothers?
48 if len(spec) == 3:
49 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata
50 if len(spec) == 3 and spec[2] is not None:
51 brother_count = len(spec[2][0])
52 for i in range(brother_count):
53 exchanges += [
54 bytes([0, 0, 0x08]),
55 bytes([0, 0, 0x09, 0xBB])
56 ]
58 return exchanges
60 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
61 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
62 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock):
63 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2]
64 mmplsize_mock.side_effect = lambda h: len(h)//8
65 blocks_spec = [
66 # (block bytes, chunk size)
67 (
68 self.buf(300) +
69 bytes.fromhex("aabbccddeeff0011220a"),
70 None,
71 ),
72 (
73 self.buf(250) +
74 bytes.fromhex("1122334405"),
75 None,
76 ),
77 (
78 self.buf(130) +
79 bytes.fromhex("334455aabbccdd2211982311aacdfe10"),
80 None,
81 ),
82 ]
84 self.dongle.exchange.side_effect = [
85 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec)
86 for bs in excs
87 ] + [bytes([0, 0, 0x05])] # Success response
89 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
90 self.assertEqual((True, 1),
91 self.hsm2dongle.update_ancestor(blocks_hex))
93 self.assert_exchange([
94 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
95 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
96 [0x30, 0x04] + list(blocks_spec[0][0][:-10]), # Block #1
97 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
98 [0x30, 0x04] + list(blocks_spec[1][0][:-5]), # Block #2
99 [0x30, 0x03, 0x00, 0x20], # Block #3 meta
100 [0x30, 0x04] + list(blocks_spec[2][0][:-16]), # Block #3
101 ])
103 @parameterized.expand([
104 ("prot_invalid", 0x6B87, -4),
105 ("rlp_invalid", 0x6B88, -5),
106 ("block_too_old", 0x6B89, -5),
107 ("block_too_short", 0x6B8A, -5),
108 ("parent_hash_invalid", 0x6B8B, -5),
109 ("receipt_root_invalid", 0x6B8C, -5),
110 ("block_num_invalid", 0x6B8D, -5),
111 ("btc_header_invalid", 0x6B90, -5),
112 ("mm_rlp_len_mismatch", 0x6B93, -5),
113 ("buffer_overflow", 0x6B99, -5),
114 ("chain_mismatch", 0x6B9A, -6),
115 ("ancestor_tip_mismatch", 0x6B9C, -7),
116 ("unexpected", 0x6BFF, -10),
117 ("error_response", bytes([0, 0, 0xFF]), -10),
118 ])
119 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
120 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
121 def test_update_ancestor_bh_error_result(self, _, error_code, response,
122 mmplsize_mock, rmvflds_mock):
123 rmvflds_mock.side_effect = lambda h: h
124 mmplsize_mock.side_effect = lambda h: len(h)//8
125 blocks_spec = [
126 # (block bytes, chunk size)
127 (self.buf(300), None),
128 (self.buf(250), None),
129 (self.buf(140), None),
130 ]
132 side_effect = [
133 bs for excs in map(self.spec_to_exchange, blocks_spec)
134 for bs in excs
135 ]
136 # Make the second block fail
137 # Init + first block meta & data + second block meta & data
138 exchange_index = 2 + 2
139 if type(error_code) == bytes:
140 side_effect[exchange_index] = error_code
141 else:
142 side_effect[exchange_index] = CommException("a-message", error_code)
143 side_effect = side_effect[:exchange_index + 1]
144 self.dongle.exchange.side_effect = side_effect
146 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
147 self.assertEqual(
148 (False, response),
149 self.hsm2dongle.update_ancestor(blocks_hex),
150 )
152 self.assert_exchange([
153 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
154 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
155 [0x30, 0x04] + list(blocks_spec[0][0]), # Block #1
156 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
157 [0x30, 0x04] + list(blocks_spec[1][0]), # Block #2
158 ])
160 @parameterized.expand([
161 ("prot_invalid", 0x6B87, -3),
162 ("unexpected", 0x6BFF, -10),
163 ("error_response", bytes([0, 0, 0xFF]), -10),
164 ])
165 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
166 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
167 def test_update_ancestor_metadata_error_result(self, _, error_code, response,
168 mmplsize_mock, rmvflds_mock):
169 rmvflds_mock.side_effect = lambda h: h
170 mmplsize_mock.side_effect = lambda h: len(h)//8
171 blocks_spec = [
172 # (block bytes, chunk size)
173 (self.buf(300), None),
174 (self.buf(250), None),
175 (self.buf(140), None),
176 ]
178 side_effect = [
179 bs for excs in map(self.spec_to_exchange, blocks_spec)
180 for bs in excs
181 ]
182 # Make the metadata of the third block fail
183 # Init + first and second block meta & chunks + third block meta
184 exchange_index = 1 + 2 + 2
185 if type(error_code) == bytes:
186 side_effect[exchange_index] = error_code
187 else:
188 side_effect[exchange_index] = CommException("a-message", error_code)
189 side_effect = side_effect[:exchange_index + 1]
190 self.dongle.exchange.side_effect = side_effect
192 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
193 self.assertEqual(
194 (False, response),
195 self.hsm2dongle.update_ancestor(blocks_hex),
196 )
198 self.assert_exchange([
199 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
200 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
201 [0x30, 0x04] + list(blocks_spec[0][0]), # Block #1
202 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
203 [0x30, 0x04] + list(blocks_spec[1][0]), # Block #2
204 [0x30, 0x03, 0x00, 0x23], # Block #3 meta
205 ])
207 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
208 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
209 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock):
210 rmvflds_mock.side_effect = lambda h: h
211 mmplsize_mock.side_effect = ValueError()
212 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])]
214 self.assertEqual(
215 (False, -2),
216 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
217 )
219 self.assert_exchange([
220 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
221 ])
222 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list)
224 @parameterized.expand([
225 ("prot_invalid", CommException("a-message", 0x6B87), -1),
226 ("unexpected", CommException("a-message", 0x6BFF), -10),
227 ("invalid_response", bytes([0, 0, 0xFF]), -10),
228 ])
229 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
230 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock):
231 rmvflds_mock.side_effect = lambda h: h
232 self.dongle.exchange.side_effect = [error]
234 self.assertEqual(
235 (False, response),
236 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
237 )
239 self.assert_exchange([
240 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
241 ])
243 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
244 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock):
245 rmvflds_mock.side_effect = ValueError("an error")
247 self.assertEqual(
248 (False, -8),
249 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
250 )
252 self.assert_exchange([])
254 def test_authorize_signer_ok(self):
255 self.dongle.exchange.side_effect = [
256 bytes(), # Response to hash, iteration - doesn't matter
257 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
258 bytes.fromhex("aaaaaa02"), # Response to second signature, OK
259 ]
261 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
262 signer_version=Mock(hash="ee"*32, iteration=0x4321),
263 signatures=["aa"*20, "bb"*25]
264 )))
266 self.assert_exchange([
267 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
268 [0x51, 0x02] + [0xaa]*20, # Signature #1
269 [0x51, 0x02] + [0xbb]*25, # Signature #2
270 ])
272 def test_authorize_signer_ok_first_sig(self):
273 self.dongle.exchange.side_effect = [
274 bytes(), # Response to hash, iteration - doesn't matter
275 bytes.fromhex("aaaaaa02"), # Response to first signature, OK
276 ]
278 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
279 signer_version=Mock(hash="ee"*32, iteration=0x4321),
280 signatures=["aa"*20, "bb"*25]
281 )))
283 self.assert_exchange([
284 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
285 [0x51, 0x02] + [0xaa]*20, # Signature #1
286 ])
288 def test_authorize_signer_sigver_error(self):
289 self.dongle.exchange.side_effect = [
290 CommException("an-error"), # Response to hash, iteration - error
291 ]
293 with self.assertRaises(HSM2DongleError):
294 self.hsm2dongle.authorize_signer(Mock(
295 signer_version=Mock(hash="ee"*32, iteration=0x4321),
296 signatures=["aa"*20, "bb"*25]
297 ))
299 self.assert_exchange([
300 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
301 ])
303 def test_authorize_signer_signature_error(self):
304 self.dongle.exchange.side_effect = [
305 bytes(), # Response to hash, iteration - doesn't matter
306 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
307 CommException("an-error"), # Response to second signature, ERROR
308 ]
310 with self.assertRaises(HSM2DongleError):
311 self.hsm2dongle.authorize_signer(Mock(
312 signer_version=Mock(hash="ee"*32, iteration=0x4321),
313 signatures=["aa"*20, "bb"*25]
314 ))
316 self.assert_exchange([
317 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
318 [0x51, 0x02] + [0xaa]*20, # Signature #1
319 [0x51, 0x02] + [0xbb]*25, # Signature #2
320 ])
322 def test_authorize_not_enough_signatures(self):
323 self.dongle.exchange.side_effect = [
324 bytes(), # Response to hash, iteration - doesn't matter
325 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
326 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE
327 ]
329 with self.assertRaises(HSM2DongleError):
330 self.hsm2dongle.authorize_signer(Mock(
331 signer_version=Mock(hash="ee"*32, iteration=0x4321),
332 signatures=["aa"*20, "bb"*25]
333 ))
335 self.assert_exchange([
336 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
337 [0x51, 0x02] + [0xaa]*20, # Signature #1
338 [0x51, 0x02] + [0xbb]*25, # Signature #2
339 ])