Coverage for tests/ledger/test_hsm2dongle.py: 98%
306 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23from unittest import TestCase
24from unittest.mock import Mock, patch, call
25from parameterized import parameterized
26from ledger.hsm2dongle import (
27 HSM2Dongle,
28 HSM2DongleError,
29 HSM2DongleCommError,
30 HSM2DongleTimeoutError,
31 HSM2DongleErrorResult,
32)
33from sgx.hsm2dongle import HSM2DongleSGX
34from ledger.version import HSM2FirmwareVersion
35from ledgerblue.commException import CommException
36from enum import Enum
38import logging
40logging.disable(logging.CRITICAL)
43class HSM2DongleTestMode(Enum):
44 Ledger = "ledger"
45 SGX = "sgx"
48class TestHSM2DongleBase(TestCase):
49 DONGLE_EXCHANGE_TIMEOUT = 10
51 CHUNK_ERROR_MAPPINGS = [
52 ("prot_invalid", 0x6B87, -4),
53 ("rlp_invalid", 0x6B88, -5),
54 ("block_too_old", 0x6B89, -5),
55 ("block_too_short", 0x6B8A, -5),
56 ("parent_hash_invalid", 0x6B8B, -5),
57 ("block_num_invalid", 0x6B8D, -5),
58 ("block_diff_invalid", 0x6B8E, -5),
59 ("umm_root_invalid", 0x6B8F, -5),
60 ("btc_header_invalid", 0x6B90, -5),
61 ("merkle_proof_invalid", 0x6B91, -5),
62 ("btc_cb_txn_invalid", 0x6B92, -6),
63 ("mm_rlp_len_mismatch", 0x6B93, -5),
64 ("btc_diff_mismatch", 0x6B94, -6),
65 ("merkle_proof_mismatch", 0x6B95, -6),
66 ("mm_hash_mismatch", 0x6B96, -6),
67 ("merkle_proof_overflow", 0x6B97, -5),
68 ("cb_txn_overflow", 0x6B98, -5),
69 ("buffer_overflow", 0x6B99, -5),
70 ("chain_mismatch", 0x6B9A, -7),
71 ("total_diff_overflow", 0x6B9B, -8),
72 ("cb_txn_hash_mismatch", 0x6B9D, -6),
73 ("brothers_too_many", 0x6B9E, -9),
74 ("brother_parent_mismatch", 0x6B9F, -9),
75 ("brother_same_as_block", 0x6BA0, -9),
76 ("brother_order_invalid", 0x6BA1, -9),
77 ("unexpected", 0x6BFF, -10),
78 ("error_response", bytes([0, 0, 0xFF]), -10),
79 ]
81 def get_test_mode(self):
82 return HSM2DongleTestMode.Ledger
84 @patch("ledger.hsm2dongle_tcp.getDongle")
85 @patch("ledger.hsm2dongle.getDongle")
86 def setUp(self, getDongleMock, getDongleTCPMock):
87 if self.get_test_mode() == HSM2DongleTestMode.Ledger:
88 self.dongle = Mock()
89 self.getDongleMock = getDongleMock
90 self.getDongleMock.return_value = self.dongle
91 self.hsm2dongle = HSM2Dongle("a-debug-value")
92 self.getDongleMock.assert_not_called()
93 self.hsm2dongle.connect()
94 self.getDongleMock.assert_called_with("a-debug-value")
95 getDongleTCPMock.assert_not_called()
96 elif self.get_test_mode() == HSM2DongleTestMode.SGX:
97 self.dongle = Mock()
98 self.getDongleMock = getDongleTCPMock
99 self.getDongleMock.return_value = self.dongle
100 self.hsm2dongle = HSM2DongleSGX("a-host", 1234, "a-debug-value")
102 self.getDongleMock.assert_not_called()
103 self.hsm2dongle.connect()
104 self.getDongleMock.assert_called_with("a-host", 1234, "a-debug-value")
105 self.assertEqual(self.hsm2dongle.dongle, self.dongle)
106 getDongleMock.assert_not_called()
107 else:
108 raise RuntimeError(f"Unknown test mode: {self.get_test_mode()}")
110 def buf(self, size):
111 return bytes(map(lambda b: b % 256, range(size)))
113 def parse_exchange_spec(self, spec, stop=None, replace=None):
114 rqs = []
115 rps = []
116 rq = True
117 stopped = False
118 for line in spec:
119 delim = ">" if rq else "<"
120 delim_pos = line.find(delim)
121 if delim_pos == -1:
122 raise RuntimeError("Invalid spec prefix")
123 name = line[:delim_pos].strip()
124 if name == stop:
125 if replace is not None:
126 (rqs if rq else rps).append(replace)
127 stopped = True
128 break
129 (rqs if rq else rps).append(
130 bytes.fromhex("80" + line[delim_pos+1:].replace(" ", ""))
131 )
132 rq = not rq
134 if stop is not None and not stopped:
135 raise RuntimeError(f"Invalid spec parsing: specified stop at '{stop}' "
136 "but exchange not found")
137 return {"requests": rqs, "responses": rps}
139 def spec_to_exchange(self, spec, trim=False):
140 trim_length = spec[0][-1] if trim else 0
141 block_size = len(spec[0]) - trim_length
142 chunk_size = spec[1]
143 exchanges = [bytes([0, 0, 0x04, chunk_size])]*(block_size//chunk_size)
144 remaining = block_size - len(exchanges)*chunk_size
145 exchanges = [bytes([0, 0, 0x03])] + exchanges + \
146 [bytes([0, 0, 0x04, remaining])]
148 # Spec has brothers?
149 if len(spec) == 3:
150 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata
151 if len(spec) == 3 and spec[2] is not None:
152 brother_count = len(spec[2][0])
153 chunk_size = spec[2][1]
154 for i in range(brother_count):
155 brother_size = len(spec[2][0][i])
156 bro_exchanges = [bytes([0, 0, 0x09, chunk_size])] * \
157 (brother_size//chunk_size)
158 remaining = brother_size - len(bro_exchanges)*chunk_size
159 exchanges += [bytes([0, 0, 0x08])] + \
160 bro_exchanges + \
161 [bytes([0, 0, 0x09, remaining])]
163 return exchanges
165 def assert_exchange(self, payloads, timeouts=None):
166 def ensure_cla(bs):
167 if bs[0] != 0x80:
168 return bytes([0x80]) + bs
169 return bs
171 if timeouts is None:
172 timeouts = [None]*len(payloads)
173 calls = list(
174 map(
175 lambda z: call(
176 ensure_cla(bytes(z[0])),
177 timeout=(z[1] if z[1] is not None else self.DONGLE_EXCHANGE_TIMEOUT),
178 ),
179 zip(payloads, timeouts),
180 ))
182 self.assertEqual(
183 len(payloads),
184 len(self.dongle.exchange.call_args_list),
185 msg="# of exchanges mismatch",
186 )
188 for i, c in enumerate(calls):
189 if c != self.dongle.exchange.call_args_list[i]:
190 print("E:", c)
191 print("A:", self.dongle.exchange.call_args_list[i])
192 self.assertEqual(
193 c,
194 self.dongle.exchange.call_args_list[i],
195 msg="%dth exchange failed" % (i + 1),
196 )
198 def do_sign_auth(self, spec):
199 return self.hsm2dongle.sign_authorized(
200 key_id=spec["keyid"],
201 rsk_tx_receipt=spec["receipt"],
202 receipt_merkle_proof=spec["mp"],
203 btc_tx=spec["tx"],
204 input_index=spec["input"],
205 sighash_computation_mode=spec["mode"],
206 witness_script=spec["ws"],
207 outpoint_value=spec["ov"],
208 )
210 def process_sign_auth_spec(self, spec, stop=None, replace=None):
211 pex = self.parse_exchange_spec(spec["exchanges"], stop=stop, replace=replace)
212 spec["requests"] = pex["requests"]
213 spec["responses"] = pex["responses"]
214 self.dongle.exchange.side_effect = spec["responses"]
215 return spec
218class TestHSM2Dongle(TestHSM2DongleBase):
219 def test_dongle_error_codes(self):
220 # Make sure enums are ok wrt signer definitions by testing a couple
221 # of arbitrary values
222 self.assertEqual(0x6B8C, self.hsm2dongle.ERR.ADVANCE.RECEIPT_ROOT_INVALID.value)
223 self.assertEqual(0x6B93, self.hsm2dongle.ERR.ADVANCE.MM_RLP_LEN_MISMATCH.value)
224 self.assertEqual(0x6BA1, self.hsm2dongle.ERR.ADVANCE.BROTHER_ORDER_INVALID.value)
225 self.assertEqual(0x6A8F, self.hsm2dongle.ERR.SIGN.INVALID_PATH)
226 self.assertEqual(
227 0x6A97,
228 self.hsm2dongle.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE.value
229 )
231 def test_connects_ok(self):
232 self.assertEqual([call("a-debug-value")], self.getDongleMock.call_args_list)
234 @patch("ledger.hsm2dongle.getDongle")
235 def test_connects_error_comm(self, getDongleMock):
236 getDongleMock.side_effect = CommException("a-message")
237 with self.assertRaises(HSM2DongleCommError):
238 self.hsm2dongle.connect()
240 @patch("ledger.hsm2dongle.getDongle")
241 def test_connects_error_other(self, getDongleMock):
242 getDongleMock.side_effect = ValueError()
243 with self.assertRaises(ValueError):
244 self.hsm2dongle.connect()
246 def test_get_current_mode(self):
247 self.dongle.exchange.return_value = bytes([10, 2, 30])
248 mode = self.hsm2dongle.get_current_mode()
249 self.assertEqual(2, mode)
250 self.assertEqual(self.hsm2dongle.MODE, type(mode))
251 self.assert_exchange([[0x43]])
253 def test_echo(self):
254 self.dongle.exchange.return_value = bytes([0x80, 0x02, 0x41, 0x42, 0x43])
255 self.assertTrue(self.hsm2dongle.echo())
256 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]])
258 def test_echo_error(self):
259 self.dongle.exchange.return_value = bytes([1, 2, 3])
260 self.assertFalse(self.hsm2dongle.echo())
261 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]])
263 def test_is_onboarded_yes(self):
264 self.dongle.exchange.return_value = bytes([0, 1, 0])
265 self.assertTrue(self.hsm2dongle.is_onboarded())
266 self.assert_exchange([[0x06]])
268 def test_is_onboarded_no(self):
269 self.dongle.exchange.return_value = bytes([0, 0, 0])
270 self.assertFalse(self.hsm2dongle.is_onboarded())
271 self.assert_exchange([[0x06]])
273 def test_onboard_ok(self):
274 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 2, 0])]
276 self.assertTrue(
277 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234"))
279 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
280 pin_exchanges = [[0x41, 0, 4]] + list(
281 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4)))
282 exchanges = seed_exchanges + pin_exchanges + [[0x07]]
283 timeouts = [None]*len(exchanges)
284 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT
285 self.assert_exchange(exchanges, timeouts)
287 def test_onboard_wipe_error(self):
288 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 1, 0])]
290 with self.assertRaises(HSM2DongleError):
291 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
293 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
294 pin_exchanges = [[0x41, 0, 4]] + list(
295 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4)))
296 exchanges = seed_exchanges + pin_exchanges + [[0x07]]
297 timeouts = [None]*len(exchanges)
298 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT
299 self.assert_exchange(exchanges, timeouts)
301 def test_onboard_pin_error(self):
302 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 3) + [
303 CommException("an-error")
304 ]
306 with self.assertRaises(HSM2DongleError):
307 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
309 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32)))
310 pin_exchanges = [[0x41, 0, 4]] + list(
311 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(3)))
312 exchanges = seed_exchanges + pin_exchanges
313 self.assert_exchange(exchanges)
315 def test_onboard_seed_error(self):
316 self.dongle.exchange.side_effect = [bytes([0])]*30 + [CommException("an-error")]
318 with self.assertRaises(HSM2DongleError):
319 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")
321 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(31)))
322 self.assert_exchange(seed_exchanges)
324 def test_unlock_ok(self):
325 self.dongle.exchange.side_effect = [
326 bytes([0]),
327 bytes([1]),
328 bytes([2]),
329 bytes([0, 0, 1]),
330 ]
331 self.assertTrue(self.hsm2dongle.unlock(bytes([1, 2, 3])))
332 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3],
333 [0xFE, 0x00, 0x00]])
335 def test_unlock_pinerror(self):
336 self.dongle.exchange.side_effect = [
337 bytes([0]),
338 bytes([1]),
339 bytes([2]),
340 bytes([0, 0, 0]),
341 ]
342 self.assertFalse(self.hsm2dongle.unlock(bytes([1, 2, 3])))
343 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3],
344 [0xFE, 0x00, 0x00]])
346 def test_new_pin(self):
347 self.dongle.exchange.side_effect = [
348 bytes([0]),
349 bytes([1]),
350 bytes([2]),
351 bytes([3]),
352 bytes([4]),
353 ]
354 self.hsm2dongle.new_pin(bytes([4, 5, 6]))
355 self.assert_exchange([[0x41, 0, 3], [0x41, 1, 4], [0x41, 2, 5], [0x41, 3, 6],
356 [0x08]])
358 def test_version(self):
359 self.dongle.exchange.return_value = bytes([0, 0, 6, 7, 8])
360 version = self.hsm2dongle.get_version()
361 self.assertEqual(HSM2FirmwareVersion, type(version))
362 self.assertEqual(6, version.major)
363 self.assertEqual(7, version.minor)
364 self.assertEqual(8, version.patch)
365 self.assert_exchange([[0x06]])
367 def test_retries(self):
368 self.dongle.exchange.return_value = bytes([0, 0, 57])
369 retries = self.hsm2dongle.get_retries()
370 self.assertEqual(57, retries)
371 self.assert_exchange([[0x45]])
373 def test_exit_menu(self):
374 self.dongle.exchange.return_value = bytes([0])
375 self.hsm2dongle.exit_menu()
376 self.assert_exchange([[0xFF, 0x00, 0x00]])
378 def test_exit_menu_explicit_autoexec(self):
379 self.dongle.exchange.return_value = bytes([0])
380 self.hsm2dongle.exit_menu(autoexec=True)
381 self.assert_exchange([[0xFF, 0x00, 0x00]])
383 def test_exit_menu_no_autoexec(self):
384 self.dongle.exchange.return_value = bytes([0])
385 self.hsm2dongle.exit_menu(autoexec=False)
386 self.assert_exchange([[0xFA, 0x00, 0x00]])
388 def test_exit_app(self):
389 self.dongle.exchange.side_effect = OSError("read error")
390 with self.assertRaises(HSM2DongleCommError):
391 self.hsm2dongle.exit_app()
392 self.assert_exchange([[0xFF]])
394 def test_get_public_key_ok(self):
395 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
396 self.dongle.exchange.return_value = bytes.fromhex("aabbccddee")
397 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id))
398 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
400 def test_get_public_key_invalid_keyid(self):
401 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
402 self.dongle.exchange.side_effect = CommException("some message", 0x6A87)
403 with self.assertRaises(HSM2DongleErrorResult):
404 self.hsm2dongle.get_public_key(key_id)
405 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
407 def test_get_public_key_timeout(self):
408 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
409 self.dongle.exchange.side_effect = CommException("Timeout")
410 with self.assertRaises(HSM2DongleTimeoutError):
411 self.hsm2dongle.get_public_key(key_id)
412 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
414 def test_get_public_key_other_error(self):
415 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
416 self.dongle.exchange.side_effect = CommException("some other message", 0xFFFF)
417 with self.assertRaises(HSM2DongleError):
418 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id))
419 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]])
422class TestHSM2DongleSignUnauthorized(TestHSM2DongleBase):
423 @patch("ledger.hsm2dongle.HSM2DongleSignature")
424 def test_sign_unauthorized_ok(self, HSM2DongleSignatureMock):
425 HSM2DongleSignatureMock.return_value = "the-signature"
426 self.dongle.exchange.side_effect = [
427 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash
428 ]
429 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
430 self.assertEqual(
431 (True, "the-signature"),
432 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
433 )
435 self.assert_exchange([
436 [
437 0x02,
438 0x01,
439 0x11,
440 0x22,
441 0x33,
442 0x44,
443 0xAA,
444 0xBB,
445 0xCC,
446 0xDD,
447 0xEE,
448 0xFF,
449 ], # Path and hash
450 ])
451 self.assertEqual(
452 [call(bytes([0x55, 0x66, 0x77, 0x88]))],
453 HSM2DongleSignatureMock.call_args_list,
454 )
456 @patch("ledger.hsm2dongle.HSM2DongleSignature")
457 def test_sign_unauthorized_invalid_signature(self, HSM2DongleSignatureMock):
458 HSM2DongleSignatureMock.side_effect = ValueError()
459 self.dongle.exchange.side_effect = [
460 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash
461 ]
462 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
463 self.assertEqual(
464 (False, -10),
465 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
466 )
468 self.assert_exchange([
469 [
470 0x02,
471 0x01,
472 0x11,
473 0x22,
474 0x33,
475 0x44,
476 0xAA,
477 0xBB,
478 0xCC,
479 0xDD,
480 0xEE,
481 0xFF,
482 ], # Path and hash
483 ])
484 self.assertEqual(
485 [call(bytes([0x55, 0x66, 0x77, 0x88]))],
486 HSM2DongleSignatureMock.call_args_list,
487 )
489 @parameterized.expand([
490 ("data_size", 0x6A87, -5),
491 ("data_size_noauth", 0x6A91, -5),
492 ("invalid_path", 0x6A8F, -1),
493 ("data_size_auth", 0x6A90, -1),
494 ("unknown", 0x6AFF, -10),
495 ("btc_tx", [0, 0, 0x02], -5),
496 ("unexpected", [0, 0, 0xAA], -10),
497 ])
498 def test_sign_unauthorized_dongle_error_result(self, _, device_error,
499 expected_response):
500 if type(device_error) == int:
501 last_exchange = CommException("msg", device_error)
502 else:
503 last_exchange = bytes(device_error)
504 self.dongle.exchange.side_effect = [last_exchange] # Response to path and hash
505 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")})
506 self.assertEqual(
507 (False, expected_response),
508 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"),
509 )
511 self.assert_exchange([
512 [
513 0x02,
514 0x01,
515 0x11,
516 0x22,
517 0x33,
518 0x44,
519 0xAA,
520 0xBB,
521 0xCC,
522 0xDD,
523 0xEE,
524 0xFF,
525 ], # Path and hash
526 ])
528 def test_sign_unauthorized_invalid_hash(self):
529 self.assertEqual(
530 (False, -5),
531 self.hsm2dongle.sign_unauthorized(key_id="doesn't matter", hash="not-a-hex"),
532 )
534 self.assertFalse(self.dongle.exchange.called)
537class TestHSM2DongleBlockchainState(TestHSM2DongleBase):
538 def test_get_blockchain_state_ok(self):
539 self.dongle.exchange.side_effect = [
540 bytes([0, 0, 0x01, 0x01]) +
541 bytes.fromhex("11"*32), # Response to get best_block
542 bytes([0, 0, 0x01, 0x02]) +
543 bytes.fromhex("22"*32), # Response to get newest_valid_block
544 bytes([0, 0, 0x01, 0x03]) +
545 bytes.fromhex("33"*32), # Response to get ancestor_block
546 bytes([0, 0, 0x01, 0x05]) +
547 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
548 bytes([0, 0, 0x01, 0x81]) +
549 bytes.fromhex("55"*32), # Response to get updating.best_block
550 bytes([0, 0, 0x01, 0x82]) +
551 bytes.fromhex("66"*32), # Response to get updating.newest_valid_block
552 bytes([0, 0, 0x01, 0x84]) +
553 bytes.fromhex("77"*32), # Response to get updating.next_expected_block
554 bytes([0, 0, 0x02]) +
555 bytes.fromhex("112233445566"), # Response to get difficulty
556 bytes([0, 0, 0x03, 0x00, 0xFF, 0xFF]), # Response to get flags
557 ]
558 self.assertEqual(
559 {
560 "best_block":
561 "11"*32,
562 "newest_valid_block":
563 "22"*32,
564 "ancestor_block":
565 "33"*32,
566 "ancestor_receipts_root":
567 "44"*32,
568 "updating.best_block":
569 "55"*32,
570 "updating.newest_valid_block":
571 "66"*32,
572 "updating.next_expected_block":
573 "77"*32,
574 "updating.total_difficulty":
575 int.from_bytes(
576 bytes.fromhex("112233445566"), byteorder="big", signed=False),
577 "updating.in_progress":
578 False,
579 "updating.already_validated":
580 True,
581 "updating.found_best_block":
582 True,
583 },
584 self.hsm2dongle.get_blockchain_state(),
585 )
587 self.assert_exchange([
588 [0x20, 0x01, 0x01],
589 [0x20, 0x01, 0x02],
590 [0x20, 0x01, 0x03],
591 [0x20, 0x01, 0x05],
592 [0x20, 0x01, 0x81],
593 [0x20, 0x01, 0x82],
594 [0x20, 0x01, 0x84],
595 [0x20, 0x02],
596 [0x20, 0x03],
597 ])
599 def test_get_blockchain_state_error_hash(self):
600 self.dongle.exchange.side_effect = [
601 bytes([0, 0, 0x01, 0x01]) +
602 bytes.fromhex("11"*32), # Response to get best_block
603 bytes([0, 0, 0x01, 0x02]) +
604 bytes.fromhex("22"*32), # Response to get newest_valid_block
605 bytes([0, 0, 0x01, 0x03]) +
606 bytes.fromhex("33"*32), # Response to get ancestor_block
607 bytes([0, 0, 0x01, 0x05]) +
608 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
609 bytes([0, 0, 0xAA]), # Response to get updating.best_block
610 ]
612 with self.assertRaises(HSM2DongleError):
613 self.hsm2dongle.get_blockchain_state()
615 self.assert_exchange([
616 [0x20, 0x01, 0x01],
617 [0x20, 0x01, 0x02],
618 [0x20, 0x01, 0x03],
619 [0x20, 0x01, 0x05],
620 [0x20, 0x01, 0x81],
621 ])
623 def test_get_blockchain_state_error_difficulty(self):
624 self.dongle.exchange.side_effect = [
625 bytes([0, 0, 0x01, 0x01]) +
626 bytes.fromhex("11"*32), # Response to get best_block
627 bytes([0, 0, 0x01, 0x02]) +
628 bytes.fromhex("22"*32), # Response to get newest_valid_block
629 bytes([0, 0, 0x01, 0x03]) +
630 bytes.fromhex("33"*32), # Response to get ancestor_block
631 bytes([0, 0, 0x01, 0x05]) +
632 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
633 bytes([0, 0, 0x01, 0x81]) +
634 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root
635 bytes([0, 0, 0x01, 0x82]) +
636 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root
637 bytes([0, 0, 0x01, 0x84]) +
638 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root
639 CommException("a-message"),
640 ]
642 with self.assertRaises(HSM2DongleError):
643 self.hsm2dongle.get_blockchain_state()
645 self.assert_exchange([
646 [0x20, 0x01, 0x01],
647 [0x20, 0x01, 0x02],
648 [0x20, 0x01, 0x03],
649 [0x20, 0x01, 0x05],
650 [0x20, 0x01, 0x81],
651 [0x20, 0x01, 0x82],
652 [0x20, 0x01, 0x84],
653 [0x20, 0x02],
654 ])
656 def test_get_blockchain_state_error_flags(self):
657 self.dongle.exchange.side_effect = [
658 bytes([0, 0, 0x01, 0x01]) +
659 bytes.fromhex("11"*32), # Response to get best_block
660 bytes([0, 0, 0x01, 0x02]) +
661 bytes.fromhex("22"*32), # Response to get newest_valid_block
662 bytes([0, 0, 0x01, 0x03]) +
663 bytes.fromhex("33"*32), # Response to get ancestor_block
664 bytes([0, 0, 0x01, 0x05]) +
665 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root
666 bytes([0, 0, 0x01, 0x81]) +
667 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root
668 bytes([0, 0, 0x01, 0x82]) +
669 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root
670 bytes([0, 0, 0x01, 0x84]) +
671 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root
672 bytes([0, 0, 0x02, 0xFF]), # Response to get difficulty
673 bytes([0, 0, 0x04]), # Response to get flags
674 ]
676 with self.assertRaises(HSM2DongleError):
677 self.hsm2dongle.get_blockchain_state()
679 self.assert_exchange([
680 [0x20, 0x01, 0x01],
681 [0x20, 0x01, 0x02],
682 [0x20, 0x01, 0x03],
683 [0x20, 0x01, 0x05],
684 [0x20, 0x01, 0x81],
685 [0x20, 0x01, 0x82],
686 [0x20, 0x01, 0x84],
687 [0x20, 0x02],
688 [0x20, 0x03],
689 ])
691 def test_reset_advance_blockchain_ok(self):
692 self.dongle.exchange.side_effect = [
693 bytes([0, 0, 0x02]), # Response
694 ]
695 self.assertTrue(self.hsm2dongle.reset_advance_blockchain())
697 self.assert_exchange([
698 [0x21, 0x01],
699 ])
701 def test_reset_advance_blockchain_invalid_response(self):
702 self.dongle.exchange.side_effect = [
703 bytes([0, 0, 0xAA]), # Response
704 ]
705 with self.assertRaises(HSM2DongleError):
706 self.hsm2dongle.reset_advance_blockchain()
708 self.assert_exchange([
709 [0x21, 0x01],
710 ])
712 def test_reset_advance_blockchain_exception(self):
713 self.dongle.exchange.side_effect = [CommException("a-message")]
714 with self.assertRaises(HSM2DongleError):
715 self.hsm2dongle.reset_advance_blockchain()
717 self.assert_exchange([
718 [0x21, 0x01],
719 ])