Coverage for tests/ledger/test_hsm2dongle.py: 99%
427 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23from unittest import TestCase
24from unittest.mock import Mock, patch, call
25from parameterized import parameterized
26from ledger.hsm2dongle import (
27 HSM2Dongle,
28 HSM2DongleError,
29 HSM2DongleCommError,
30 HSM2DongleTimeoutError,
31 HSM2DongleErrorResult,
32)
33from ledger.version import HSM2FirmwareVersion
34from ledgerblue.commException import CommException
36import logging
38logging.disable(logging.CRITICAL)
41class TestHSM2DongleBase(TestCase):
42 DONGLE_EXCHANGE_TIMEOUT = 10
44 CHUNK_ERROR_MAPPINGS = [
45 ("prot_invalid", 0x6B87, -4),
46 ("rlp_invalid", 0x6B88, -5),
47 ("block_too_old", 0x6B89, -5),
48 ("block_too_short", 0x6B8A, -5),
49 ("parent_hash_invalid", 0x6B8B, -5),
50 ("block_num_invalid", 0x6B8D, -5),
51 ("block_diff_invalid", 0x6B8E, -5),
52 ("umm_root_invalid", 0x6B8F, -5),
53 ("btc_header_invalid", 0x6B90, -5),
54 ("merkle_proof_invalid", 0x6B91, -5),
55 ("btc_cb_txn_invalid", 0x6B92, -6),
56 ("mm_rlp_len_mismatch", 0x6B93, -5),
57 ("btc_diff_mismatch", 0x6B94, -6),
58 ("merkle_proof_mismatch", 0x6B95, -6),
59 ("mm_hash_mismatch", 0x6B96, -6),
60 ("merkle_proof_overflow", 0x6B97, -5),
61 ("cb_txn_overflow", 0x6B98, -5),
62 ("buffer_overflow", 0x6B99, -5),
63 ("chain_mismatch", 0x6B9A, -7),
64 ("total_diff_overflow", 0x6B9B, -8),
65 ("cb_txn_hash_mismatch", 0x6B9D, -6),
66 ("brothers_too_many", 0x6B9E, -9),
67 ("brother_parent_mismatch", 0x6B9F, -9),
68 ("brother_same_as_block", 0x6BA0, -9),
69 ("brother_order_invalid", 0x6BA1, -9),
70 ("unexpected", 0x6BFF, -10),
71 ("error_response", bytes([0, 0, 0xFF]), -10),
72 ]
74 @patch("ledger.hsm2dongle.getDongle")
75 def setUp(self, getDongleMock):
76 self.dongle = Mock()
77 self.getDongleMock = getDongleMock
78 self.getDongleMock.return_value = self.dongle
79 self.hsm2dongle = HSM2Dongle("a-debug-value")
80 self.hsm2dongle.connect()
82 def buf(self, size):
83 return bytes(map(lambda b: b % 256, range(size)))
85 def parse_exchange_spec(self, spec, stop=None, replace=None):
86 rqs = []
87 rps = []
88 rq = True
89 stopped = False
90 for line in spec:
91 delim = ">" if rq else "<"
92 delim_pos = line.find(delim)
93 if delim_pos == -1:
94 raise RuntimeError("Invalid spec prefix")
95 name = line[:delim_pos].strip()
96 if name == stop:
97 if replace is not None:
98 (rqs if rq else rps).append(replace)
99 stopped = True
100 break
101 (rqs if rq else rps).append(
102 bytes.fromhex("80" + line[delim_pos+1:].replace(" ", ""))
103 )
104 rq = not rq
106 if stop is not None and not stopped:
107 raise RuntimeError(f"Invalid spec parsing: specified stop at '{stop}' "
108 "but exchange not found")
109 return {"requests": rqs, "responses": rps}
111 def spec_to_exchange(self, spec, trim=False):
112 trim_length = spec[0][-1] if trim else 0
113 block_size = len(spec[0]) - trim_length
114 chunk_size = spec[1]
115 exchanges = [bytes([0, 0, 0x04, chunk_size])]*(block_size//chunk_size)
116 remaining = block_size - len(exchanges)*chunk_size
117 exchanges = [bytes([0, 0, 0x03])] + exchanges + \
118 [bytes([0, 0, 0x04, remaining])]
120 # Spec has brothers?
121 if len(spec) == 3:
122 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata
123 if len(spec) == 3 and spec[2] is not None:
124 brother_count = len(spec[2][0])
125 chunk_size = spec[2][1]
126 for i in range(brother_count):
127 brother_size = len(spec[2][0][i])
128 bro_exchanges = [bytes([0, 0, 0x09, chunk_size])] * \
129 (brother_size//chunk_size)
130 remaining = brother_size - len(bro_exchanges)*chunk_size
131 exchanges += [bytes([0, 0, 0x08])] + \
132 bro_exchanges + \
133 [bytes([0, 0, 0x09, remaining])]
135 return exchanges
137 def assert_exchange(self, payloads, timeouts=None):
138 def ensure_cla(bs):
139 if bs[0] != 0x80:
140 return bytes([0x80]) + bs
141 return bs
143 if timeouts is None:
144 timeouts = [None]*len(payloads)
145 calls = list(
146 map(
147 lambda z: call(
148 ensure_cla(bytes(z[0])),
149 timeout=(z[1] if z[1] is not None else self.DONGLE_EXCHANGE_TIMEOUT),
150 ),
151 zip(payloads, timeouts),
152 ))
154 self.assertEqual(
155 len(payloads),
156 len(self.dongle.exchange.call_args_list),
157 msg="# of exchanges mismatch",
158 )
160 for i, c in enumerate(calls):
161 if c != self.dongle.exchange.call_args_list[i]:
162 print("E:", c)
163 print("A:", self.dongle.exchange.call_args_list[i])
164 self.assertEqual(
165 c,
166 self.dongle.exchange.call_args_list[i],
167 msg="%dth exchange failed" % (i + 1),
168 )
170 def do_sign_auth(self, spec):
171 return self.hsm2dongle.sign_authorized(
172 key_id=spec["keyid"],
173 rsk_tx_receipt=spec["receipt"],
174 receipt_merkle_proof=spec["mp"],
175 btc_tx=spec["tx"],
176 input_index=spec["input"],
177 sighash_computation_mode=spec["mode"],
178 witness_script=spec["ws"],
179 outpoint_value=spec["ov"],
180 )
182 def process_sign_auth_spec(self, spec, stop=None, replace=None):
183 pex = self.parse_exchange_spec(spec["exchanges"], stop=stop, replace=replace)
184 spec["requests"] = pex["requests"]
185 spec["responses"] = pex["responses"]
186 self.dongle.exchange.side_effect = spec["responses"]
187 return spec
190class TestHSM2Dongle(TestHSM2DongleBase):
191 def test_dongle_error_codes(self):
192 # Make sure enums are ok wrt signer definitions by testing a couple
193 # of arbitrary values
194 self.assertEqual(0x6B8C, self.hsm2dongle.ERR.ADVANCE.RECEIPT_ROOT_INVALID.value)
195 self.assertEqual(0x6B93, self.hsm2dongle.ERR.ADVANCE.MM_RLP_LEN_MISMATCH.value)
196 self.assertEqual(0x6BA1, self.hsm2dongle.ERR.ADVANCE.BROTHER_ORDER_INVALID.value)
197 self.assertEqual(0x6A8F, self.hsm2dongle.ERR.SIGN.INVALID_PATH)
198 self.assertEqual(
199 0x6A97,
200 self.hsm2dongle.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE.value
201 )
203 def test_connects_ok(self):
204 self.assertEqual([call("a-debug-value")], self.getDongleMock.call_args_list)
206 @patch("ledger.hsm2dongle.getDongle")
207 def test_connects_error_comm(self, getDongleMock):
208 getDongleMock.side_effect = CommException("a-message")
209 with self.assertRaises(HSM2DongleCommError):
210 self.hsm2dongle.connect()
212 @patch("ledger.hsm2dongle.getDongle")
213 def test_connects_error_other(self, getDongleMock):
214 getDongleMock.side_effect = ValueError()
215 with self.assertRaises(ValueError):
216 self.hsm2dongle.connect()
218 def test_get_current_mode(self):
219 self.dongle.exchange.return_value = bytes([10, 2, 30])
220 mode = self.hsm2dongle.get_current_mode()
221 self.assertEqual(2, mode)
222 self.assertEqual(self.hsm2dongle.MODE, type(mode))
223 self.assert_exchange([[0x43]])
225 def test_echo(self):
226 self.dongle.exchange.return_value = bytes([0x80, 0x02, 0x41, 0x42, 0x43])
227 self.assertTrue(self.hsm2dongle.echo())
228 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]])
230 def test_echo_error(self):
231 self.dongle.exchange.return_value = bytes([1, 2, 3])
232 self.assertFalse(self.hsm2dongle.echo())
233 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]])
235 def test_is_onboarded_yes(self):
236 self.dongle.exchange.return_value = bytes([0, 1, 0])
237 self.assertTrue(self.hsm2dongle.is_onboarded())
238 self.assert_exchange([[0x06]])
240 def test_is_onboarded_no(self):
241 self.dongle.exchange.return_value = bytes([0, 0, 0])
242 self.assertFalse(self.hsm2dongle.is_onboarded())
243 self.assert_exchange([[0x06]])
245 def test_onboard_ok(self):
246 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 2, 0])]
248 self.assertTrue(
249 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234"))
251 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
252 pin_exchanges = [[0x41, 0, 4]] + list(
253 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4)))
254 exchanges = seed_exchanges + pin_exchanges + [[0x07]]
255 timeouts = [None]*len(exchanges)
256 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT
257 self.assert_exchange(exchanges, timeouts)
259 def test_onboard_wipe_error(self):
260 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 1, 0])]
262 with self.assertRaises(HSM2DongleError):
263 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
265 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
266 pin_exchanges = [[0x41, 0, 4]] + list(
267 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4)))
268 exchanges = seed_exchanges + pin_exchanges + [[0x07]]
269 timeouts = [None]*len(exchanges)
270 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT
271 self.assert_exchange(exchanges, timeouts)
273 def test_onboard_pin_error(self):
274 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 3) + [
275 CommException("an-error")
276 ]
278 with self.assertRaises(HSM2DongleError):
279 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
281 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
282 pin_exchanges = [[0x41, 0, 4]] + list(
283 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(3)))
284 exchanges = seed_exchanges + pin_exchanges
285 self.assert_exchange(exchanges)
287 def test_onboard_seed_error(self):
288 self.dongle.exchange.side_effect = [bytes([0])]*30 + [CommException("an-error")]
290 with self.assertRaises(HSM2DongleError):
291 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
293 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(31)))
294 self.assert_exchange(seed_exchanges)
296 def test_unlock_ok(self):
297 self.dongle.exchange.side_effect = [
298 bytes([0]),
299 bytes([1]),
300 bytes([2]),
301 bytes([0, 0, 1]),
302 ]
303 self.assertTrue(self.hsm2dongle.unlock(bytes([1, 2, 3])))
304 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3],
305 [0xFE, 0x00, 0x00]])
307 def test_unlock_pinerror(self):
308 self.dongle.exchange.side_effect = [
309 bytes([0]),
310 bytes([1]),
311 bytes([2]),
312 bytes([0, 0, 0]),
313 ]
314 self.assertFalse(self.hsm2dongle.unlock(bytes([1, 2, 3])))
315 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3],
316 [0xFE, 0x00, 0x00]])
318 def test_new_pin(self):
319 self.dongle.exchange.side_effect = [
320 bytes([0]),
321 bytes([1]),
322 bytes([2]),
323 bytes([3]),
324 bytes([4]),
325 ]
326 self.hsm2dongle.new_pin(bytes([4, 5, 6]))
327 self.assert_exchange([[0x41, 0, 3], [0x41, 1, 4], [0x41, 2, 5], [0x41, 3, 6],
328 [0x08]])
330 def test_version(self):
331 self.dongle.exchange.return_value = bytes([0, 0, 6, 7, 8])
332 version = self.hsm2dongle.get_version()
333 self.assertEqual(HSM2FirmwareVersion, type(version))
334 self.assertEqual(6, version.major)
335 self.assertEqual(7, version.minor)
336 self.assertEqual(8, version.patch)
337 self.assert_exchange([[0x06]])
339 def test_retries(self):
340 self.dongle.exchange.return_value = bytes([0, 0, 57])
341 retries = self.hsm2dongle.get_retries()
342 self.assertEqual(57, retries)
343 self.assert_exchange([[0x45]])
345 def test_exit_menu(self):
346 self.dongle.exchange.return_value = bytes([0])
347 self.hsm2dongle.exit_menu()
348 self.assert_exchange([[0xFF, 0x00, 0x00]])
350 def test_exit_menu_explicit_autoexec(self):
351 self.dongle.exchange.return_value = bytes([0])
352 self.hsm2dongle.exit_menu(autoexec=True)
353 self.assert_exchange([[0xFF, 0x00, 0x00]])
355 def test_exit_menu_no_autoexec(self):
356 self.dongle.exchange.return_value = bytes([0])
357 self.hsm2dongle.exit_menu(autoexec=False)
358 self.assert_exchange([[0xFA, 0x00, 0x00]])
360 def test_exit_app(self):
361 self.dongle.exchange.side_effect = OSError("read error")
362 with self.assertRaises(HSM2DongleCommError):
363 self.hsm2dongle.exit_app()
364 self.assert_exchange([[0xFF]])
366 def test_get_public_key_ok(self):
367 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
368 self.dongle.exchange.return_value = bytes.fromhex("aabbccddee")
369 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id))
370 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
372 def test_get_public_key_invalid_keyid(self):
373 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
374 self.dongle.exchange.side_effect = CommException("some message", 0x6A87)
375 with self.assertRaises(HSM2DongleErrorResult):
376 self.hsm2dongle.get_public_key(key_id)
377 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
379 def test_get_public_key_timeout(self):
380 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
381 self.dongle.exchange.side_effect = CommException("Timeout")
382 with self.assertRaises(HSM2DongleTimeoutError):
383 self.hsm2dongle.get_public_key(key_id)
384 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
386 def test_get_public_key_other_error(self):
387 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
388 self.dongle.exchange.side_effect = CommException("some other message", 0xFFFF)
389 with self.assertRaises(HSM2DongleError):
390 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id))
391 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
394class TestHSM2DongleSignUnauthorized(TestHSM2DongleBase):
395 @patch("ledger.hsm2dongle.HSM2DongleSignature")
396 def test_sign_unauthorized_ok(self, HSM2DongleSignatureMock):
397 HSM2DongleSignatureMock.return_value = "the-signature"
398 self.dongle.exchange.side_effect = [
399 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash
400 ]
401 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
402 self.assertEqual(
403 (True, "the-signature"),
404 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
405 )
407 self.assert_exchange([
408 [
409 0x02,
410 0x01,
411 0x11,
412 0x22,
413 0x33,
414 0x44,
415 0xAA,
416 0xBB,
417 0xCC,
418 0xDD,
419 0xEE,
420 0xFF,
421 ], # Path and hash
422 ])
423 self.assertEqual(
424 [call(bytes([0x55, 0x66, 0x77, 0x88]))],
425 HSM2DongleSignatureMock.call_args_list,
426 )
428 @patch("ledger.hsm2dongle.HSM2DongleSignature")
429 def test_sign_unauthorized_invalid_signature(self, HSM2DongleSignatureMock):
430 HSM2DongleSignatureMock.side_effect = ValueError()
431 self.dongle.exchange.side_effect = [
432 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash
433 ]
434 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
435 self.assertEqual(
436 (False, -10),
437 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
438 )
440 self.assert_exchange([
441 [
442 0x02,
443 0x01,
444 0x11,
445 0x22,
446 0x33,
447 0x44,
448 0xAA,
449 0xBB,
450 0xCC,
451 0xDD,
452 0xEE,
453 0xFF,
454 ], # Path and hash
455 ])
456 self.assertEqual(
457 [call(bytes([0x55, 0x66, 0x77, 0x88]))],
458 HSM2DongleSignatureMock.call_args_list,
459 )
461 @parameterized.expand([
462 ("data_size", 0x6A87, -5),
463 ("data_size_noauth", 0x6A91, -5),
464 ("invalid_path", 0x6A8F, -1),
465 ("data_size_auth", 0x6A90, -1),
466 ("unknown", 0x6AFF, -10),
467 ("btc_tx", [0, 0, 0x02], -5),
468 ("unexpected", [0, 0, 0xAA], -10),
469 ])
470 def test_sign_unauthorized_dongle_error_result(self, _, device_error,
471 expected_response):
472 if type(device_error) == int:
473 last_exchange = CommException("msg", device_error)
474 else:
475 last_exchange = bytes(device_error)
476 self.dongle.exchange.side_effect = [last_exchange] # Response to path and hash
477 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
478 self.assertEqual(
479 (False, expected_response),
480 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
481 )
483 self.assert_exchange([
484 [
485 0x02,
486 0x01,
487 0x11,
488 0x22,
489 0x33,
490 0x44,
491 0xAA,
492 0xBB,
493 0xCC,
494 0xDD,
495 0xEE,
496 0xFF,
497 ], # Path and hash
498 ])
500 def test_sign_unauthorized_invalid_hash(self):
501 self.assertEqual(
502 (False, -5),
503 self.hsm2dongle.sign_unauthorized(key_id="doesn't matter", hash="not-a-hex"),
504 )
506 self.assertFalse(self.dongle.exchange.called)
509class TestHSM2DongleBlockchainState(TestHSM2DongleBase):
510 def test_get_blockchain_state_ok(self):
511 self.dongle.exchange.side_effect = [
512 bytes([0, 0, 0x01, 0x01]) +
513 bytes.fromhex("11"*32), # Response to get best_block
514 bytes([0, 0, 0x01, 0x02]) +
515 bytes.fromhex("22"*32), # Response to get newest_valid_block
516 bytes([0, 0, 0x01, 0x03]) +
517 bytes.fromhex("33"*32), # Response to get ancestor_block
518 bytes([0, 0, 0x01, 0x05]) +
519 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
520 bytes([0, 0, 0x01, 0x81]) +
521 bytes.fromhex("55"*32), # Response to get updating.best_block
522 bytes([0, 0, 0x01, 0x82]) +
523 bytes.fromhex("66"*32), # Response to get updating.newest_valid_block
524 bytes([0, 0, 0x01, 0x84]) +
525 bytes.fromhex("77"*32), # Response to get updating.next_expected_block
526 bytes([0, 0, 0x02]) +
527 bytes.fromhex("112233445566"), # Response to get difficulty
528 bytes([0, 0, 0x03, 0x00, 0xFF, 0xFF]), # Response to get flags
529 ]
530 self.assertEqual(
531 {
532 "best_block":
533 "11"*32,
534 "newest_valid_block":
535 "22"*32,
536 "ancestor_block":
537 "33"*32,
538 "ancestor_receipts_root":
539 "44"*32,
540 "updating.best_block":
541 "55"*32,
542 "updating.newest_valid_block":
543 "66"*32,
544 "updating.next_expected_block":
545 "77"*32,
546 "updating.total_difficulty":
547 int.from_bytes(
548 bytes.fromhex("112233445566"), byteorder="big", signed=False),
549 "updating.in_progress":
550 False,
551 "updating.already_validated":
552 True,
553 "updating.found_best_block":
554 True,
555 },
556 self.hsm2dongle.get_blockchain_state(),
557 )
559 self.assert_exchange([
560 [0x20, 0x01, 0x01],
561 [0x20, 0x01, 0x02],
562 [0x20, 0x01, 0x03],
563 [0x20, 0x01, 0x05],
564 [0x20, 0x01, 0x81],
565 [0x20, 0x01, 0x82],
566 [0x20, 0x01, 0x84],
567 [0x20, 0x02],
568 [0x20, 0x03],
569 ])
571 def test_get_blockchain_state_error_hash(self):
572 self.dongle.exchange.side_effect = [
573 bytes([0, 0, 0x01, 0x01]) +
574 bytes.fromhex("11"*32), # Response to get best_block
575 bytes([0, 0, 0x01, 0x02]) +
576 bytes.fromhex("22"*32), # Response to get newest_valid_block
577 bytes([0, 0, 0x01, 0x03]) +
578 bytes.fromhex("33"*32), # Response to get ancestor_block
579 bytes([0, 0, 0x01, 0x05]) +
580 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
581 bytes([0, 0, 0xAA]), # Response to get updating.best_block
582 ]
584 with self.assertRaises(HSM2DongleError):
585 self.hsm2dongle.get_blockchain_state()
587 self.assert_exchange([
588 [0x20, 0x01, 0x01],
589 [0x20, 0x01, 0x02],
590 [0x20, 0x01, 0x03],
591 [0x20, 0x01, 0x05],
592 [0x20, 0x01, 0x81],
593 ])
595 def test_get_blockchain_state_error_difficulty(self):
596 self.dongle.exchange.side_effect = [
597 bytes([0, 0, 0x01, 0x01]) +
598 bytes.fromhex("11"*32), # Response to get best_block
599 bytes([0, 0, 0x01, 0x02]) +
600 bytes.fromhex("22"*32), # Response to get newest_valid_block
601 bytes([0, 0, 0x01, 0x03]) +
602 bytes.fromhex("33"*32), # Response to get ancestor_block
603 bytes([0, 0, 0x01, 0x05]) +
604 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
605 bytes([0, 0, 0x01, 0x81]) +
606 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root
607 bytes([0, 0, 0x01, 0x82]) +
608 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root
609 bytes([0, 0, 0x01, 0x84]) +
610 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root
611 CommException("a-message"),
612 ]
614 with self.assertRaises(HSM2DongleError):
615 self.hsm2dongle.get_blockchain_state()
617 self.assert_exchange([
618 [0x20, 0x01, 0x01],
619 [0x20, 0x01, 0x02],
620 [0x20, 0x01, 0x03],
621 [0x20, 0x01, 0x05],
622 [0x20, 0x01, 0x81],
623 [0x20, 0x01, 0x82],
624 [0x20, 0x01, 0x84],
625 [0x20, 0x02],
626 ])
628 def test_get_blockchain_state_error_flags(self):
629 self.dongle.exchange.side_effect = [
630 bytes([0, 0, 0x01, 0x01]) +
631 bytes.fromhex("11"*32), # Response to get best_block
632 bytes([0, 0, 0x01, 0x02]) +
633 bytes.fromhex("22"*32), # Response to get newest_valid_block
634 bytes([0, 0, 0x01, 0x03]) +
635 bytes.fromhex("33"*32), # Response to get ancestor_block
636 bytes([0, 0, 0x01, 0x05]) +
637 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
638 bytes([0, 0, 0x01, 0x81]) +
639 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root
640 bytes([0, 0, 0x01, 0x82]) +
641 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root
642 bytes([0, 0, 0x01, 0x84]) +
643 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root
644 bytes([0, 0, 0x02, 0xFF]), # Response to get difficulty
645 bytes([0, 0, 0x04]), # Response to get flags
646 ]
648 with self.assertRaises(HSM2DongleError):
649 self.hsm2dongle.get_blockchain_state()
651 self.assert_exchange([
652 [0x20, 0x01, 0x01],
653 [0x20, 0x01, 0x02],
654 [0x20, 0x01, 0x03],
655 [0x20, 0x01, 0x05],
656 [0x20, 0x01, 0x81],
657 [0x20, 0x01, 0x82],
658 [0x20, 0x01, 0x84],
659 [0x20, 0x02],
660 [0x20, 0x03],
661 ])
663 def test_reset_advance_blockchain_ok(self):
664 self.dongle.exchange.side_effect = [
665 bytes([0, 0, 0x02]), # Response
666 ]
667 self.assertTrue(self.hsm2dongle.reset_advance_blockchain())
669 self.assert_exchange([
670 [0x21, 0x01],
671 ])
673 def test_reset_advance_blockchain_invalid_response(self):
674 self.dongle.exchange.side_effect = [
675 bytes([0, 0, 0xAA]), # Response
676 ]
677 with self.assertRaises(HSM2DongleError):
678 self.hsm2dongle.reset_advance_blockchain()
680 self.assert_exchange([
681 [0x21, 0x01],
682 ])
684 def test_reset_advance_blockchain_exception(self):
685 self.dongle.exchange.side_effect = [CommException("a-message")]
686 with self.assertRaises(HSM2DongleError):
687 self.hsm2dongle.reset_advance_blockchain()
689 self.assert_exchange([
690 [0x21, 0x01],
691 ])
694class TestHSM2DongleAdvanceBlockchain(TestHSM2DongleBase):
695 def setup_mocks(self,
696 mmplsize_mock,
697 get_cb_txn_mock,
698 cb_txn_get_hash_mock,
699 gbh_mock):
700 mmplsize_mock.side_effect = lambda h: len(h)//8
701 get_cb_txn_mock.side_effect = lambda h: {"cb_txn": h}
702 cb_txn_get_hash_mock.side_effect = lambda h: \
703 (bytes([len(h["cb_txn"])//5])*4).hex()
704 gbh_mock.return_value = "00"
706 @parameterized.expand([
707 ("partial_v2.0.x", 0x05, 2),
708 ("total_v2.0.x", 0x06, 1),
709 ("partial_v2.1.x", 0x05, 2),
710 ("total_v2.1.x", 0x06, 1),
711 ])
712 @patch("ledger.hsm2dongle.get_block_hash")
713 @patch("ledger.hsm2dongle.coinbase_tx_get_hash")
714 @patch("ledger.hsm2dongle.get_coinbase_txn")
715 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
716 def test_advance_blockchain_ok(
717 self,
718 _,
719 device_response,
720 expected_response,
721 mmplsize_mock,
722 get_cb_txn_mock,
723 cb_txn_get_hash_mock,
724 gbh_mock,
725 ):
726 self.setup_mocks(mmplsize_mock,
727 get_cb_txn_mock,
728 cb_txn_get_hash_mock,
729 gbh_mock)
730 brothers_spec = [
731 # (brother list of brother bytes, chunk size)
732 ([self.buf(190), self.buf(100)], 90),
733 None, # 2nd block has no brothers
734 ([self.buf(130)], 60),
735 ]
736 blocks_spec = [
737 # (block bytes, chunk size, brothers)
738 (self.buf(300), 80, brothers_spec[0]),
739 (self.buf(250), 100, brothers_spec[1]),
740 (self.buf(140), 50, brothers_spec[2]),
741 ]
743 self.dongle.exchange.side_effect = [
744 bs for excs in map(self.spec_to_exchange, blocks_spec)
745 for bs in excs
746 ] + [bytes([0, 0, device_response])] # Success response
748 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
749 brothers_list = list(map(
750 lambda bs: list(map(
751 lambda b: b.hex(), bs[0])) if bs else [],
752 brothers_spec))
753 self.assertEqual(
754 (True, expected_response),
755 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list),
756 )
758 self.assert_exchange([
759 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
760 [0x10, 0x03, 0x00, 0x4B] +
761 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta
762 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk
763 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk
764 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk
765 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk
766 [0x10, 0x07, 0x02], # Blk #1 brother count
767 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta
768 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk
769 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk
770 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk
771 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta
772 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk
773 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk
774 [0x10, 0x03, 0x00, 0x3E] +
775 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta
776 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk
777 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk
778 [0x10, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Blk #2 chunk
779 [0x10, 0x07, 0x00], # Blk #2 brother count
780 [0x10, 0x03, 0x00, 0x23] +
781 [0x38, 0x38, 0x38, 0x38], # Blk #3 meta
782 [0x10, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Blk #3 chunk
783 [0x10, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Blk #3 chunk
784 [0x10, 0x04] + list(blocks_spec[2][0][50*2:50*3]), # Blk #3 chunk
785 [0x10, 0x07, 0x01], # Blk #3 brother count
786 [0x10, 0x08, 0x00, 0x20, 0x34, 0x34, 0x34, 0x34], # Blk #3 bro #1 meta
787 [0x10, 0x09] + list(brothers_spec[2][0][0][60*0:60*1]), # Blk #3 bro #1 chunk
788 [0x10, 0x09] + list(brothers_spec[2][0][0][60*1:60*2]), # Blk #3 bro #1 chunk
789 [0x10, 0x09] + list(brothers_spec[2][0][0][60*2:60*3]), # Blk #3 bro #1 chunk
790 ])
792 @parameterized.expand(TestHSM2DongleBase.CHUNK_ERROR_MAPPINGS)
793 @patch("ledger.hsm2dongle.get_block_hash")
794 @patch("ledger.hsm2dongle.coinbase_tx_get_hash")
795 @patch("ledger.hsm2dongle.get_coinbase_txn")
796 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
797 def test_advance_blockchain_chunk_error_result(
798 self,
799 _,
800 error_code,
801 response,
802 mmplsize_mock,
803 get_cb_txn_mock,
804 cb_txn_get_hash_mock,
805 gbh_mock,
806 ):
807 self.setup_mocks(mmplsize_mock,
808 get_cb_txn_mock,
809 cb_txn_get_hash_mock,
810 gbh_mock)
811 brothers_spec = [
812 # (brother list of brother bytes, chunk size)
813 ([self.buf(190), self.buf(100)], 90),
814 None, # 2nd block has no brothers
815 ([self.buf(130)], 60),
816 ]
817 blocks_spec = [
818 # (block bytes, chunk size, brothers)
819 (self.buf(300), 80, brothers_spec[0]),
820 (self.buf(250), 100, brothers_spec[1]),
821 (self.buf(140), 50, brothers_spec[2]),
822 ]
824 side_effect = [
825 bs for excs in map(self.spec_to_exchange, blocks_spec)
826 for bs in excs
827 ]
829 # Make the second chunk of the second block fail
830 # First block meta & chunks & bro metas & chunks
831 # + second block meta & first & second chunk
832 exchange_index = (
833 (1 + 300//80 + 1) + 1 + (1 + 190//90 + 1) + (1 + 100//90 + 1) + 3
834 )
836 if type(error_code) == bytes:
837 side_effect[exchange_index] = error_code
838 else:
839 side_effect[exchange_index] = CommException("a-message", error_code)
840 side_effect = side_effect[:exchange_index + 1]
841 self.dongle.exchange.side_effect = side_effect
843 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
844 brothers_list = list(map(
845 lambda bs: list(map(
846 lambda b: b.hex(), bs[0])) if bs else [],
847 brothers_spec))
849 self.assertEqual(
850 (False, response),
851 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list),
852 )
854 self.assert_exchange([
855 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
856 [0x10, 0x03, 0x00, 0x4B] +
857 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta
858 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk
859 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk
860 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk
861 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk
862 [0x10, 0x07, 0x02], # Blk #1 brother count
863 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta
864 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk
865 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk
866 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk
867 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta
868 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk
869 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk
870 [0x10, 0x03, 0x00, 0x3E] +
871 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta
872 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk
873 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk
874 ])
876 @parameterized.expand([
877 ("prot_invalid", 0x6B87, -3),
878 ("unexpected", 0x6BFF, -10),
879 ("error_response", bytes([0, 0, 0xFF]), -10),
880 ])
881 @patch("ledger.hsm2dongle.get_block_hash")
882 @patch("ledger.hsm2dongle.coinbase_tx_get_hash")
883 @patch("ledger.hsm2dongle.get_coinbase_txn")
884 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
885 def test_advance_blockchain_metadata_error_result(
886 self,
887 _,
888 error_code,
889 response,
890 mmplsize_mock,
891 get_cb_txn_mock,
892 cb_txn_get_hash_mock,
893 gbh_mock,
894 ):
895 self.setup_mocks(mmplsize_mock,
896 get_cb_txn_mock,
897 cb_txn_get_hash_mock,
898 gbh_mock)
899 brothers_spec = [
900 # (brother list of brother bytes, chunk size)
901 ([self.buf(190), self.buf(100)], 90),
902 None, # 2nd block has no brothers
903 ([self.buf(130)], 60),
904 ]
905 blocks_spec = [
906 # (block bytes, chunk size, brothers)
907 (self.buf(300), 80, brothers_spec[0]),
908 (self.buf(250), 100, brothers_spec[1]),
909 (self.buf(140), 50, brothers_spec[2]),
910 ]
912 side_effect = [
913 bs for excs in map(self.spec_to_exchange, blocks_spec)
914 for bs in excs
915 ]
917 # Make the metadata of the third block fail
918 # First block meta & chunks & bro metas & chunks
919 # + second block meta & chunks & bro meta
920 # + third block meta
921 exchange_index = (
922 (1 + 300//80 + 1) + 1 + (1 + 190//90 + 1) + (1 + 100//90 + 1) +
923 (1 + 250//100 + 1) + 1 +
924 1
925 )
927 if type(error_code) == bytes:
928 side_effect[exchange_index] = error_code
929 else:
930 side_effect[exchange_index] = CommException("a-message", error_code)
931 side_effect = side_effect[:exchange_index + 1]
932 self.dongle.exchange.side_effect = side_effect
934 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
936 brothers_list = list(map(
937 lambda bs: list(map(
938 lambda b: b.hex(), bs[0])) if bs else [],
939 brothers_spec))
941 self.assertEqual(
942 (False, response),
943 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list),
944 )
946 self.assert_exchange([
947 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
948 [0x10, 0x03, 0x00, 0x4B] +
949 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta
950 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk
951 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk
952 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk
953 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk
954 [0x10, 0x07, 0x02], # Blk #1 brother count
955 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta
956 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk
957 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk
958 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk
959 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta
960 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk
961 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk
962 [0x10, 0x03, 0x00, 0x3E] +
963 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta
964 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk
965 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk
966 [0x10, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Blk #2 chunk
967 [0x10, 0x07, 0x00], # Blk #2 brother count
968 [0x10, 0x03, 0x00, 0x23] +
969 [0x38, 0x38, 0x38, 0x38], # Blk #3 meta
970 ])
972 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
973 def test_advance_blockchain_metadata_error_generating(self, mmplsize_mock):
974 mmplsize_mock.side_effect = ValueError()
975 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])]
977 self.assertEqual(
978 (False, -2),
979 self.hsm2dongle.advance_blockchain(["first-block", "second-block"],
980 [[], []]),
981 )
983 self.assert_exchange([
984 [0x10, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
985 ])
986 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list)
988 @parameterized.expand([
989 ("prot_invalid", CommException("a-message", 0x6B87), -1),
990 ("unexpected", CommException("a-message", 0x6BFF), -10),
991 ("invalid_response", bytes([0, 0, 0xFF]), -10),
992 ])
993 def test_advance_blockchain_init_error(self, _, error, response):
994 self.dongle.exchange.side_effect = [error]
996 self.assertEqual(
997 (False, response),
998 self.hsm2dongle.advance_blockchain(["first-block", "second-block"],
999 [[], []]),
1000 )
1002 self.assert_exchange([
1003 [0x10, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
1004 ])
1007class TestHSM2DongleUpdateAncestor(TestHSM2DongleBase):
1008 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1009 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
1010 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock):
1011 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2]
1012 mmplsize_mock.side_effect = lambda h: len(h)//8
1013 blocks_spec = [
1014 # (block bytes, chunk size)
1015 (
1016 self.buf(300) +
1017 bytes.fromhex("aabbccddeeff0011220a"),
1018 80,
1019 ),
1020 (
1021 self.buf(250) +
1022 bytes.fromhex("1122334405"),
1023 100,
1024 ),
1025 (
1026 self.buf(130) +
1027 bytes.fromhex("334455aabbccdd2211982311aacdfe10"),
1028 50,
1029 ),
1030 ]
1032 self.dongle.exchange.side_effect = [
1033 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec)
1034 for bs in excs
1035 ] + [bytes([0, 0, 0x05])] # Success response
1037 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
1038 self.assertEqual((True, 1),
1039 self.hsm2dongle.update_ancestor(blocks_hex))
1041 self.assert_exchange([
1042 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
1043 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
1044 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
1045 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
1046 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
1047 [0x30, 0x04] +
1048 list(blocks_spec[0][0][80*3:80*4][:-blocks_spec[0][0][-1]]), # Block #1 chunk
1049 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
1050 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
1051 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
1052 [0x30, 0x04] +
1053 list(blocks_spec[1][0][100*2:100 *
1054 3][:-blocks_spec[1][0][-1]]), # Block #2 chunk
1055 [0x30, 0x03, 0x00, 0x20], # Block #3 meta
1056 [0x30, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Block #2 chunk
1057 [0x30, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Block #3 chunk
1058 [0x30, 0x04] +
1059 list(blocks_spec[2][0][50*2:50*3][:-blocks_spec[2][0][-1]]), # Block #3 chunk
1060 ])
1062 @parameterized.expand([
1063 ("prot_invalid", 0x6B87, -4),
1064 ("rlp_invalid", 0x6B88, -5),
1065 ("block_too_old", 0x6B89, -5),
1066 ("block_too_short", 0x6B8A, -5),
1067 ("parent_hash_invalid", 0x6B8B, -5),
1068 ("receipt_root_invalid", 0x6B8C, -5),
1069 ("block_num_invalid", 0x6B8D, -5),
1070 ("btc_header_invalid", 0x6B90, -5),
1071 ("mm_rlp_len_mismatch", 0x6B93, -5),
1072 ("buffer_overflow", 0x6B99, -5),
1073 ("chain_mismatch", 0x6B9A, -6),
1074 ("ancestor_tip_mismatch", 0x6B9C, -7),
1075 ("unexpected", 0x6BFF, -10),
1076 ("error_response", bytes([0, 0, 0xFF]), -10),
1077 ])
1078 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1079 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
1080 def test_update_ancestor_chunk_error_result(self, _, error_code, response,
1081 mmplsize_mock, rmvflds_mock):
1082 rmvflds_mock.side_effect = lambda h: h
1083 mmplsize_mock.side_effect = lambda h: len(h)//8
1084 blocks_spec = [
1085 # (block bytes, chunk size)
1086 (self.buf(300), 80),
1087 (self.buf(250), 100),
1088 (self.buf(140), 50),
1089 ]
1091 side_effect = [
1092 bs for excs in map(self.spec_to_exchange, blocks_spec)
1093 for bs in excs
1094 ]
1095 # Make the second chunk of the second block fail
1096 exchange_index = (
1097 1 + (300//80 + 2) + 2
1098 ) # Init + first block meta & chunks + second block meta & first chunk
1099 if type(error_code) == bytes:
1100 side_effect[exchange_index] = error_code
1101 else:
1102 side_effect[exchange_index] = CommException("a-message", error_code)
1103 side_effect = side_effect[:exchange_index + 1]
1104 self.dongle.exchange.side_effect = side_effect
1106 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
1107 self.assertEqual(
1108 (False, response),
1109 self.hsm2dongle.update_ancestor(blocks_hex),
1110 )
1112 self.assert_exchange([
1113 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
1114 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
1115 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
1116 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
1117 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
1118 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk
1119 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
1120 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
1121 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
1122 ])
1124 @parameterized.expand([
1125 ("prot_invalid", 0x6B87, -3),
1126 ("unexpected", 0x6BFF, -10),
1127 ("error_response", bytes([0, 0, 0xFF]), -10),
1128 ])
1129 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1130 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
1131 def test_update_ancestor_metadata_error_result(self, _, error_code, response,
1132 mmplsize_mock, rmvflds_mock):
1133 rmvflds_mock.side_effect = lambda h: h
1134 mmplsize_mock.side_effect = lambda h: len(h)//8
1135 blocks_spec = [
1136 # (block bytes, chunk size)
1137 (self.buf(300), 80),
1138 (self.buf(250), 100),
1139 (self.buf(140), 50),
1140 ]
1142 side_effect = [
1143 bs for excs in map(self.spec_to_exchange, blocks_spec)
1144 for bs in excs
1145 ]
1146 # Make the metadata of the third block fail
1147 exchange_index = (
1148 1 + (300//80 + 2) + (250//100 + 2)
1149 ) # Init + first and second block meta & chunks + third block meta
1150 if type(error_code) == bytes:
1151 side_effect[exchange_index] = error_code
1152 else:
1153 side_effect[exchange_index] = CommException("a-message", error_code)
1154 side_effect = side_effect[:exchange_index + 1]
1155 self.dongle.exchange.side_effect = side_effect
1157 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec))
1158 self.assertEqual(
1159 (False, response),
1160 self.hsm2dongle.update_ancestor(blocks_hex),
1161 )
1163 self.assert_exchange([
1164 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks
1165 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta
1166 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk
1167 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk
1168 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk
1169 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk
1170 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta
1171 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk
1172 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk
1173 [0x30, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Block #2 chunk
1174 [0x30, 0x03, 0x00, 0x23], # Block #3 meta
1175 ])
1177 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1178 @patch("ledger.hsm2dongle.rlp_mm_payload_size")
1179 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock):
1180 rmvflds_mock.side_effect = lambda h: h
1181 mmplsize_mock.side_effect = ValueError()
1182 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])]
1184 self.assertEqual(
1185 (False, -2),
1186 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
1187 )
1189 self.assert_exchange([
1190 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
1191 ])
1192 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list)
1194 @parameterized.expand([
1195 ("prot_invalid", CommException("a-message", 0x6B87), -1),
1196 ("unexpected", CommException("a-message", 0x6BFF), -10),
1197 ("invalid_response", bytes([0, 0, 0xFF]), -10),
1198 ])
1199 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1200 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock):
1201 rmvflds_mock.side_effect = lambda h: h
1202 self.dongle.exchange.side_effect = [error]
1204 self.assertEqual(
1205 (False, response),
1206 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
1207 )
1209 self.assert_exchange([
1210 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks
1211 ])
1213 @patch("ledger.hsm2dongle.remove_mm_fields_if_present")
1214 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock):
1215 rmvflds_mock.side_effect = ValueError("an error")
1217 self.assertEqual(
1218 (False, -8),
1219 self.hsm2dongle.update_ancestor(["first-block", "second-block"]),
1220 )
1222 self.assert_exchange([])
1224 def test_authorize_signer_ok(self):
1225 self.dongle.exchange.side_effect = [
1226 bytes(), # Response to hash, iteration - doesn't matter
1227 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
1228 bytes.fromhex("aaaaaa02"), # Response to second signature, OK
1229 ]
1231 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
1232 signer_version=Mock(hash="ee"*32, iteration=0x4321),
1233 signatures=["aa"*20, "bb"*25]
1234 )))
1236 self.assert_exchange([
1237 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
1238 [0x51, 0x02] + [0xaa]*20, # Signature #1
1239 [0x51, 0x02] + [0xbb]*25, # Signature #2
1240 ])
1242 def test_authorize_signer_ok_first_sig(self):
1243 self.dongle.exchange.side_effect = [
1244 bytes(), # Response to hash, iteration - doesn't matter
1245 bytes.fromhex("aaaaaa02"), # Response to first signature, OK
1246 ]
1248 self.assertTrue(self.hsm2dongle.authorize_signer(Mock(
1249 signer_version=Mock(hash="ee"*32, iteration=0x4321),
1250 signatures=["aa"*20, "bb"*25]
1251 )))
1253 self.assert_exchange([
1254 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
1255 [0x51, 0x02] + [0xaa]*20, # Signature #1
1256 ])
1258 def test_authorize_signer_sigver_error(self):
1259 self.dongle.exchange.side_effect = [
1260 CommException("an-error"), # Response to hash, iteration - error
1261 ]
1263 with self.assertRaises(HSM2DongleError):
1264 self.hsm2dongle.authorize_signer(Mock(
1265 signer_version=Mock(hash="ee"*32, iteration=0x4321),
1266 signatures=["aa"*20, "bb"*25]
1267 ))
1269 self.assert_exchange([
1270 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
1271 ])
1273 def test_authorize_signer_signature_error(self):
1274 self.dongle.exchange.side_effect = [
1275 bytes(), # Response to hash, iteration - doesn't matter
1276 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
1277 CommException("an-error"), # Response to second signature, ERROR
1278 ]
1280 with self.assertRaises(HSM2DongleError):
1281 self.hsm2dongle.authorize_signer(Mock(
1282 signer_version=Mock(hash="ee"*32, iteration=0x4321),
1283 signatures=["aa"*20, "bb"*25]
1284 ))
1286 self.assert_exchange([
1287 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
1288 [0x51, 0x02] + [0xaa]*20, # Signature #1
1289 [0x51, 0x02] + [0xbb]*25, # Signature #2
1290 ])
1292 def test_authorize_not_enough_signatures(self):
1293 self.dongle.exchange.side_effect = [
1294 bytes(), # Response to hash, iteration - doesn't matter
1295 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE
1296 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE
1297 ]
1299 with self.assertRaises(HSM2DongleError):
1300 self.hsm2dongle.authorize_signer(Mock(
1301 signer_version=Mock(hash="ee"*32, iteration=0x4321),
1302 signatures=["aa"*20, "bb"*25]
1303 ))
1305 self.assert_exchange([
1306 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration
1307 [0x51, 0x02] + [0xaa]*20, # Signature #1
1308 [0x51, 0x02] + [0xbb]*25, # Signature #2
1309 ])