Coverage for tests/ledger/hsm2dongle_cmds/test_hsm2dongle_update_ancestor.py: 100%
96 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23from unittest.mock import Mock, patch, call
24from parameterized import parameterized
25from ..test_hsm2dongle import TestHSM2DongleBase
26from ledger.hsm2dongle import HSM2DongleError
27from ledgerblue.commException import CommException
29import logging
31logging.disable(logging.CRITICAL)
34class TestHSM2DongleUpdateAncestor(TestHSM2DongleBase):
35 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
36 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
37 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock):
38 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2]
39 mmplsize_mock.side_effect = lambda h: len(h)//8
40 blocks_spec = [
41 # (block bytes, chunk size)
42 (
43 self.buf(300) +
44 bytes.fromhex("aabbccddeeff0011220a"),
45 80,
46 ),
47 (
48 self.buf(250) +
49 bytes.fromhex("1122334405"),
50 100,
51 ),
52 (
53 self.buf(130) +
54 bytes.fromhex("334455aabbccdd2211982311aacdfe10"),
55 50,
56 ),
57 ]
59 self.dongle.exchange.side_effect = [
60 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec)
61 for bs in excs
62 ] + [bytes([0, 0, 0x05])] # Success response
64 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
65 self.assertEqual((True, 1),
66 self.hsm2dongle.update_ancestor(blocks_hex))
68 self.assert_exchange([
69 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
70 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
71 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
72 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
73 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
74 [0x30, 0x04] +
75 list(blocks_spec[0][0][80*3:80*4][:-blocks_spec[0][0][-1]]), # Block #1 chunk
76 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
77 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
78 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
79 [0x30, 0x04] +
80 list(blocks_spec[1][0][100*2:100 *
81 3][:-blocks_spec[1][0][-1]]), # Block #2 chunk
82 [0x30, 0x03, 0x00, 0x20], # Block #3 meta
83 [0x30, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Block #2 chunk
84 [0x30, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Block #3 chunk
85 [0x30, 0x04] +
86 list(blocks_spec[2][0][50*2:50*3][:-blocks_spec[2][0][-1]]), # Block #3 chunk
87 ])
89 @parameterized.expand([
90 ("prot_invalid", 0x6B87, -4),
91 ("rlp_invalid", 0x6B88, -5),
92 ("block_too_old", 0x6B89, -5),
93 ("block_too_short", 0x6B8A, -5),
94 ("parent_hash_invalid", 0x6B8B, -5),
95 ("receipt_root_invalid", 0x6B8C, -5),
96 ("block_num_invalid", 0x6B8D, -5),
97 ("btc_header_invalid", 0x6B90, -5),
98 ("mm_rlp_len_mismatch", 0x6B93, -5),
99 ("buffer_overflow", 0x6B99, -5),
100 ("chain_mismatch", 0x6B9A, -6),
101 ("ancestor_tip_mismatch", 0x6B9C, -7),
102 ("unexpected", 0x6BFF, -10),
103 ("error_response", bytes([0, 0, 0xFF]), -10),
104 ])
105 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
106 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
107 def test_update_ancestor_chunk_error_result(self, _, error_code, response,
108 mmplsize_mock, rmvflds_mock):
109 rmvflds_mock.side_effect = lambda h: h
110 mmplsize_mock.side_effect = lambda h: len(h)//8
111 blocks_spec = [
112 # (block bytes, chunk size)
113 (self.buf(300), 80),
114 (self.buf(250), 100),
115 (self.buf(140), 50),
116 ]
118 side_effect = [
119 bs for excs in map(self.spec_to_exchange, blocks_spec)
120 for bs in excs
121 ]
122 # Make the second chunk of the second block fail
123 exchange_index = (
124 1 + (300//80 + 2) + 2
125 ) # Init + first block meta & chunks + second block meta & first chunk
126 if type(error_code) == bytes:
127 side_effect[exchange_index] = error_code
128 else:
129 side_effect[exchange_index] = CommException("a-message", error_code)
130 side_effect = side_effect[:exchange_index + 1]
131 self.dongle.exchange.side_effect = side_effect
133 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
134 self.assertEqual(
135 (False, response),
136 self.hsm2dongle.update_ancestor(blocks_hex),
137 )
139 self.assert_exchange([
140 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
141 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
142 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
143 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
144 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
145 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk
146 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
147 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
148 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
149 ])
151 @parameterized.expand([
152 ("prot_invalid", 0x6B87, -3),
153 ("unexpected", 0x6BFF, -10),
154 ("error_response", bytes([0, 0, 0xFF]), -10),
155 ])
156 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
157 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
158 def test_update_ancestor_metadata_error_result(self, _, error_code, response,
159 mmplsize_mock, rmvflds_mock):
160 rmvflds_mock.side_effect = lambda h: h
161 mmplsize_mock.side_effect = lambda h: len(h)//8
162 blocks_spec = [
163 # (block bytes, chunk size)
164 (self.buf(300), 80),
165 (self.buf(250), 100),
166 (self.buf(140), 50),
167 ]
169 side_effect = [
170 bs for excs in map(self.spec_to_exchange, blocks_spec)
171 for bs in excs
172 ]
173 # Make the metadata of the third block fail
174 exchange_index = (
175 1 + (300//80 + 2) + (250//100 + 2)
176 ) # Init + first and second block meta & chunks + third block meta
177 if type(error_code) == bytes:
178 side_effect[exchange_index] = error_code
179 else:
180 side_effect[exchange_index] = CommException("a-message", error_code)
181 side_effect = side_effect[:exchange_index + 1]
182 self.dongle.exchange.side_effect = side_effect
184 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
185 self.assertEqual(
186 (False, response),
187 self.hsm2dongle.update_ancestor(blocks_hex),
188 )
190 self.assert_exchange([
191 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
192 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
193 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
194 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
195 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
196 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk
197 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
198 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
199 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
200 [0x30, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Block #2 chunk
201 [0x30, 0x03, 0x00, 0x23], # Block #3 meta
202 ])
204 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
205 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
206 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock):
207 rmvflds_mock.side_effect = lambda h: h
208 mmplsize_mock.side_effect = ValueError()
209 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])]
211 self.assertEqual(
212 (False, -2),
213 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
214 )
216 self.assert_exchange([
217 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
218 ])
219 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list)
221 @parameterized.expand([
222 ("prot_invalid", CommException("a-message", 0x6B87), -1),
223 ("unexpected", CommException("a-message", 0x6BFF), -10),
224 ("invalid_response", bytes([0, 0, 0xFF]), -10),
225 ])
226 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
227 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock):
228 rmvflds_mock.side_effect = lambda h: h
229 self.dongle.exchange.side_effect = [error]
231 self.assertEqual(
232 (False, response),
233 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
234 )
236 self.assert_exchange([
237 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
238 ])
240 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
241 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock):
242 rmvflds_mock.side_effect = ValueError("an error")
244 self.assertEqual(
245 (False, -8),
246 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
247 )
249 self.assert_exchange([])
251 def test_authorize_signer_ok(self):
252 self.dongle.exchange.side_effect = [
253 bytes(), # Response to hash, iteration - doesn't matter
254 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
255 bytes.fromhex("aaaaaa02"), # Response to second signature, OK
256 ]
258 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
259 signer_version=Mock(hash="ee"*32, iteration=0x4321),
260 signatures=["aa"*20, "bb"*25]
261 )))
263 self.assert_exchange([
264 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
265 [0x51, 0x02] + [0xaa]*20, # Signature #1
266 [0x51, 0x02] + [0xbb]*25, # Signature #2
267 ])
269 def test_authorize_signer_ok_first_sig(self):
270 self.dongle.exchange.side_effect = [
271 bytes(), # Response to hash, iteration - doesn't matter
272 bytes.fromhex("aaaaaa02"), # Response to first signature, OK
273 ]
275 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
276 signer_version=Mock(hash="ee"*32, iteration=0x4321),
277 signatures=["aa"*20, "bb"*25]
278 )))
280 self.assert_exchange([
281 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
282 [0x51, 0x02] + [0xaa]*20, # Signature #1
283 ])
285 def test_authorize_signer_sigver_error(self):
286 self.dongle.exchange.side_effect = [
287 CommException("an-error"), # Response to hash, iteration - error
288 ]
290 with self.assertRaises(HSM2DongleError):
291 self.hsm2dongle.authorize_signer(Mock(
292 signer_version=Mock(hash="ee"*32, iteration=0x4321),
293 signatures=["aa"*20, "bb"*25]
294 ))
296 self.assert_exchange([
297 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
298 ])
300 def test_authorize_signer_signature_error(self):
301 self.dongle.exchange.side_effect = [
302 bytes(), # Response to hash, iteration - doesn't matter
303 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
304 CommException("an-error"), # Response to second signature, ERROR
305 ]
307 with self.assertRaises(HSM2DongleError):
308 self.hsm2dongle.authorize_signer(Mock(
309 signer_version=Mock(hash="ee"*32, iteration=0x4321),
310 signatures=["aa"*20, "bb"*25]
311 ))
313 self.assert_exchange([
314 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
315 [0x51, 0x02] + [0xaa]*20, # Signature #1
316 [0x51, 0x02] + [0xbb]*25, # Signature #2
317 ])
319 def test_authorize_not_enough_signatures(self):
320 self.dongle.exchange.side_effect = [
321 bytes(), # Response to hash, iteration - doesn't matter
322 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
323 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE
324 ]
326 with self.assertRaises(HSM2DongleError):
327 self.hsm2dongle.authorize_signer(Mock(
328 signer_version=Mock(hash="ee"*32, iteration=0x4321),
329 signatures=["aa"*20, "bb"*25]
330 ))
332 self.assert_exchange([
333 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
334 [0x51, 0x02] + [0xaa]*20, # Signature #1
335 [0x51, 0x02] + [0xbb]*25, # Signature #2
336 ])