Coverage for ledger/hsm2dongle.py: 89%
692 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import struct
24from enum import IntEnum, Enum, auto
25from ledgerblue.comm import getDongle
26from ledgerblue.commException import CommException
27import hid
28from .signature import HSM2DongleSignature
29from .version import HSM2FirmwareVersion
30from .parameters import HSM2FirmwareParameters
31from .hsm2dongle_cmds import HSM2SignerHeartbeat, HSM2UIHeartbeat
32from .block_utils import (
33 rlp_mm_payload_size,
34 remove_mm_fields_if_present,
35 get_coinbase_txn,
36 get_block_hash,
37)
38from comm.bitcoin import encode_varint
39from comm.pow import coinbase_tx_get_hash
40import logging
42# Enumerations
44# Dongle commands
47class _Command(IntEnum):
48 IS_ONBOARD = 0x06
49 ECHO = 0x02 # UI command
50 SIGN = 0x02 # Signer command
51 GET_PUBLIC_KEY = 0x04
52 SEND_PIN = 0x41
53 UNLOCK = 0xFE
54 CHANGE_PIN = 0x08
55 GET_MODE = 0x43
56 EXIT_MENU = 0xFF
57 EXIT_MENU_NO_AUTOEXEC = 0xFA
58 GET_STATE = 0x20
59 RESET_AB = 0x21
60 ADVANCE = 0x10
61 UPD_ANCESTOR = 0x30
62 GET_PARAMETERS = 0x11
63 SEED = 0x44
64 WIPE = 0x07
65 UI_ATT = 0x50
66 SIGNER_ATT = 0x50
67 SIGNER_AUTH = 0x51
68 RETRIES = 0x45
71# Sign command OPs
72class _SignOps(IntEnum):
73 PATH = 0x01
74 BTC_TX = 0x02
75 TX_RECEIPT = 0x04
76 MERKLE_PROOF = 0x08
77 SUCCESS = 0x81
80# Get blockchain state command OPs
81class _GetStateOps(IntEnum):
82 HASH = 0x01
83 DIFF = 0x02
84 FLAGS = 0x03
87# Reset advance blockchain command OPs
88class _ResetAdvanceOps(IntEnum):
89 INIT = 0x01
90 DONE = 0x02
93# Advance blockchain command OPs
94class _AdvanceOps(IntEnum):
95 INIT = 0x02
96 HEADER_META = 0x03
97 HEADER_CHUNK = 0x04
98 PARTIAL = 0x05
99 SUCCESS = 0x06
100 BROTHER_LIST_META = 0x07
101 BROTHER_META = 0x08
102 BROTHER_CHUNK = 0x09
105# Update ancestor command OPs
106class _UpdateAncestorOps(IntEnum):
107 INIT = 0x02
108 HEADER_META = 0x03
109 HEADER_CHUNK = 0x04
110 SUCCESS = 0x05
113# UI attestation OPs
114class _UIAttestationOps(IntEnum):
115 OP_UD_VALUE = 0x01
116 OP_GET_MSG = 0x02
117 OP_GET = 0x03
118 OP_APP_HASH = 0x04
121# Signer attestation OPs
122class _SignerAttestationOps(IntEnum):
123 OP_GET = 0x01
124 OP_GET_MESSAGE = 0x02
125 OP_APP_HASH = 0x03
128# Signer authorization OPs (and results)
129class _SignerAuthorizationOps(IntEnum):
130 OP_SIGVER = 0x01
131 OP_SIGN = 0x02
132 OP_SIGN_RES_MORE = 0x01
133 OP_SIGN_RES_SUCCESS = 0x02
136# Command Ops
137class _Ops:
138 SIGN = _SignOps
139 GST = _GetStateOps
140 RAV = _ResetAdvanceOps
141 ADVANCE = _AdvanceOps
142 UPD_ANCESTOR = _UpdateAncestorOps
143 UI_ATT = _UIAttestationOps
144 SIGNER_ATT = _SignerAttestationOps
145 SIGNER_AUTH = _SignerAuthorizationOps
148# Protocol offsets
149class _Offset(IntEnum):
150 CLA = 0
151 CMD = 1
152 OP = 2
153 DATA = 3
156# Device modes
157class _Mode(IntEnum):
158 BOOTLOADER = 0x02
159 SIGNER = 0x03
160 UI_HEARTBEAT = 0x04
161 UNKNOWN = 0xFF
164# Get blockchain state flag indexes
165class _GetStateFlagOffset(IntEnum):
166 IN_PROGRESS = 0
167 ALREADY_VALIDATED = 1
168 FOUND_BEST_BLOCK = 2
171# Get blockchain state constants
172class _GetState:
173 FLAG_OFFSET = _GetStateFlagOffset
174 HASH_VALUES = {
175 "best_block": 0x01,
176 "newest_valid_block": 0x02,
177 "ancestor_block": 0x03,
178 "ancestor_receipts_root": 0x05,
179 "updating.best_block": 0x81,
180 "updating.newest_valid_block": 0x82,
181 "updating.next_expected_block": 0x84,
182 }
185# Sign command errors
186class _SignError(IntEnum):
187 DATA_SIZE = 0x6A87
188 INPUT = auto()
189 STATE = auto()
190 RLP = auto()
191 RLP_INT = auto()
192 RLP_DEPTH = auto()
193 TX_HASH_MISMATCH = auto()
194 TX_VERSION = auto()
195 INVALID_PATH = auto()
196 DATA_SIZE_AUTH = auto()
197 DATA_SIZE_NOAUTH = auto()
198 NODE_VERSION = auto()
199 SHARED_PREFIX_TOO_BIG = auto()
200 RECEIPT_HASH_MISMATCH = auto()
201 NODE_CHAINING_MISMATCH = auto()
202 RECEIPT_ROOT_MISMATCH = auto()
203 INVALID_SIGHASH_COMPUTATION_MODE = auto()
204 INVALID_EXTRADATA_SIZE = auto()
207# Get public key command errors
208class _GetPubKeyError(IntEnum):
209 DATA_SIZE = 0x6A87
212# Advance blockchain and update ancestor command errors
213class _AdvanceUpdateError(IntEnum):
214 UNKNOWN = 0
215 PROT_INVALID = 0x6B87
216 RLP_INVALID = auto()
217 BLOCK_TOO_OLD = auto()
218 BLOCK_TOO_SHORT = auto()
219 PARENT_HASH_INVALID = auto()
220 RECEIPT_ROOT_INVALID = auto()
221 BLOCK_NUM_INVALID = auto()
222 BLOCK_DIFF_INVALID = auto()
223 UMM_ROOT_INVALID = auto()
224 BTC_HEADER_INVALID = auto()
225 MERKLE_PROOF_INVALID = auto()
226 BTC_CB_TXN_INVALID = auto()
227 MM_RLP_LEN_MISMATCH = auto()
228 BTC_DIFF_MISMATCH = auto()
229 MERKLE_PROOF_MISMATCH = auto()
230 MM_HASH_MISMATCH = auto()
231 MERKLE_PROOF_OVERFLOW = auto()
232 CB_TXN_OVERFLOW = auto()
233 BUFFER_OVERFLOW = auto()
234 CHAIN_MISMATCH = auto()
235 TOTAL_DIFF_OVERFLOW = auto()
236 ANCESTOR_TIP_MISMATCH = auto()
237 CB_TXN_HASH_MISMATCH = auto()
238 BROTHERS_TOO_MANY = auto()
239 BROTHER_PARENT_MISMATCH = auto()
240 BROTHER_SAME_AS_BLOCK = auto()
241 BROTHER_ORDER_INVALID = auto()
244class _UIError(IntEnum):
245 INVALID_PIN = 0x69A0
248class _UIAttestationError(IntEnum):
249 PROT_INVALID = 0x6A01
250 NO_ONBOARD = 0x6A02
251 INTERNAL = 0x6A99
254class _SignerAttestationError(IntEnum):
255 PROT_INVALID = 0x6B00
256 INTERNAL = 0x6B01
259class _SignerAuthorizationError(IntEnum):
260 PROT_INVALID = 0x6A01
261 INVALID_ITERATION = 0x6a03
262 INVALID_SIGNATURE = 0x6a04
263 INVALID_AUTH_INVALID_INDEX = 0x6a05
266# Error codes
267class _Error:
268 SIGN = _SignError
269 GETPUBKEY = _GetPubKeyError
270 ADVANCE = _AdvanceUpdateError
271 UPD_ANCESTOR = _AdvanceUpdateError
272 UI = _UIError
273 UI_ATT = _UIAttestationError
274 SIGNER_ATT = _SignerAttestationError
275 SIGNER_AUTH = _SignerAuthorizationError
277 # Whether a given code is in the
278 # user-defined (RSK firmware) specific error code range
279 @staticmethod
280 def is_user_defined_error(code):
281 return code >= 0x69A0 and code <= 0x6BFF
284# Sign command responses to the user
285class _SignResponse(IntEnum):
286 ERROR_PATH = -1
287 ERROR_BTC_TX = -2
288 ERROR_TX_RECEIPT = -3
289 ERROR_MERKLE_PROOF = -4
290 ERROR_HASH = -5
291 ERROR_UNEXPECTED = -10
294# Advance blockchain responses to the user
295class _AdvanceResponse(IntEnum):
296 OK_TOTAL = 1
297 OK_PARTIAL = 2
299 ERROR_INIT = -1
300 ERROR_COMPUTE_METADATA = -2
301 ERROR_METADATA = -3
302 ERROR_BLOCK_DATA = -4
303 ERROR_INVALID_BLOCK = -5
304 ERROR_POW_INVALID = -6
305 ERROR_CHAINING_MISMATCH = -7
306 ERROR_UNSUPPORTED_CHAIN = -8
307 ERROR_INVALID_BROTHERS = -9
308 ERROR_UNEXPECTED = -10
311# Update ancestor responses to the user
312class _UpdateAncestorResponse(IntEnum):
313 OK_TOTAL = 1
315 ERROR_INIT = -1
316 ERROR_COMPUTE_METADATA = -2
317 ERROR_METADATA = -3
318 ERROR_BLOCK_DATA = -4
319 ERROR_INVALID_BLOCK = -5
320 ERROR_CHAINING_MISMATCH = -6
321 ERROR_TIP_MISMATCH = -7
322 ERROR_REMOVE_MM_FIELDS = -8
323 ERROR_UNEXPECTED = -10
326# Responses
327class _Response:
328 SIGN = _SignResponse
329 ADVANCE = _AdvanceResponse
330 UPD_ANCESTOR = _UpdateAncestorResponse
333# Onboarding constants
334class _Onboarding(IntEnum):
335 SEED_LENGTH = 32
336 TIMEOUT = 10
339# Sighash computation modes
340class SighashComputationMode(Enum):
341 def __new__(cls, *args, **kwds):
342 obj = object.__new__(cls)
343 obj._value_ = args[0]
344 obj.netvalue = args[1]
345 return obj
347 LEGACY = "legacy", 0
348 SEGWIT = "segwit", 1
351class HSM2DongleBaseError(RuntimeError):
352 @property
353 def message(self):
354 if len(self.args) == 0:
355 return None
356 return self.args[0]
359class HSM2DongleError(HSM2DongleBaseError):
360 pass
363class HSM2DongleTimeoutError(HSM2DongleBaseError):
364 @staticmethod
365 def is_timeout(exc):
366 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout":
367 return True
368 return False
371class HSM2DongleCommError(HSM2DongleBaseError):
372 @staticmethod
373 def is_comm_error(exc):
374 if (
375 type(exc) == BaseException
376 and len(exc.args) == 1
377 and exc.args[0] == "Error while writing"
378 ) or (
379 type(exc) == OSError
380 and len(exc.args) == 1
381 and exc.args[0] == "read error"
382 ) or isinstance(exc, HSM2DongleCommError):
383 return True
384 return False
387class HSM2DongleErrorResult(HSM2DongleBaseError):
388 @property
389 def error_code(self):
390 return self.args[0]
392 def __str__(self):
393 return f"Dongle returned error code {hex(self.error_code)}"
396# Handles low-level communication with a powHSM dongle
397class HSM2Dongle:
398 # Ledger constants
399 HASH_SIZE = 32
401 # APDU prefix
402 CLA = 0x80
404 # Enumeration shorthands
405 OFF = _Offset
406 CMD = _Command
407 MODE = _Mode
408 OP = _Ops
409 ERR = _Error
410 GST = _GetState
411 RESPONSE = _Response
412 ONBOARDING = _Onboarding
414 # Dongle exchange timeout
415 DONGLE_TIMEOUT = 10 # seconds
417 # Maximum pages expected to conform the UI attestation message
418 MAX_PAGES_UI_ATT_MESSAGE = 4
420 # Size of the iteration parameter for the signer authorization
421 SIGNER_AUTH_ITERATION_SIZE = 2
423 # Shorthand for externally defined commands
424 ErrorResult = HSM2DongleErrorResult
426 def __init__(self, debug):
427 self.logger = logging.getLogger("dongle")
428 self.debug = debug
429 self.last_comm_exception = None
431 # Send command to device
432 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT):
433 self.last_comm_exception = None
434 try:
435 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data)
436 self.logger.debug("Sending command: 0x%s", cmd.hex())
437 result = self.dongle.exchange(cmd, timeout=timeout)
438 self.logger.debug("Received: 0x%s", result.hex())
439 except (CommException, BaseException) as e:
440 # If this is a user-defined error, raise an
441 # error result error
442 if type(e) == CommException:
443 self.last_comm_exception = e
444 error_code = e.sw
445 if _Error.is_user_defined_error(error_code):
446 self.logger.error("Received error code: %s", hex(error_code))
447 raise HSM2DongleErrorResult(error_code)
449 # If this is a dongle timeout, raise a timeout error
450 if HSM2DongleTimeoutError.is_timeout(e):
451 raise HSM2DongleTimeoutError(str(e))
453 # If this is a dongle communication problem, raise a comm error
454 if HSM2DongleCommError.is_comm_error(e):
455 raise HSM2DongleCommError(str(e))
457 # Raise a standard error, but
458 # report differently for a CommException and any other
459 # type of exception
460 if type(e) == CommException:
461 msg = "Error sending command: %s" % str(e)
462 self.logger.error(msg)
463 else:
464 msg = "Unknown error sending command: %s (of type %s)" % \
465 (str(e), type(e).__name__)
466 self.logger.critical(msg)
468 raise HSM2DongleError(msg)
470 return result
472 # Send command version to be used by command classes
473 def send_command(self, cmd, op, data, timeout=DONGLE_TIMEOUT):
474 return self._send_command(cmd, bytes([op]) + data, timeout)
476 # Connect to the dongle
477 def connect(self):
478 try:
479 self.logger.info("Connecting")
480 self.dongle = getDongle(self.debug)
481 self.logger.info("Connected")
482 except CommException as e:
483 msg = "Error connecting: %s" % e.message
484 self.logger.error(msg)
485 raise HSM2DongleCommError(msg)
487 # Disconnect from dongle
488 def disconnect(self):
489 try:
490 self.logger.info("Disconnecting")
491 if self.dongle and self.dongle.opened:
492 self.dongle.close()
493 # **** Begin hack ****
494 # When running within a docker container,
495 # the hidapi library fails to detect a physical
496 # usb device reconnection. This will "hard reset" the
497 # stack so that a potential physical device reconnection
498 # can be detected.
499 try:
500 hid.hidapi_exit()
501 except Exception:
502 # hidapi_exit() can sometimes throw. we don't care
503 pass
504 # **** End hack ****
505 self.logger.info("Disconnected")
506 except CommException as e:
507 msg = "Error disconnecting: %s" % e.message
508 self.logger.error(msg)
509 raise HSM2DongleCommError(msg)
511 # Return device mode
512 def get_current_mode(self):
513 try:
514 apdu_rcv = self._send_command(self.CMD.GET_MODE)
515 return self.MODE(apdu_rcv[1])
516 except HSM2DongleError:
517 return self.MODE.UNKNOWN
519 # Echo message
520 def echo(self):
521 message = bytes([0x41, 0x42, 0x43])
522 result = bytes(self._send_command(self.CMD.ECHO, message))
523 # Result should be the command plus the message
524 expected_result = bytes([self.CLA, self.CMD.ECHO]) + message
525 return result == expected_result
527 # Return true if the hsm2 is onboarded
528 def is_onboarded(self):
529 self.logger.info("Sending isOnboarded")
530 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD)
531 is_onboard = apdu_rcv[1] == 1
532 self.logger.info("isOnboarded: %s", "yes" if is_onboard else "no")
533 return is_onboard
535 # Attempt to onboard the device using the given seed and pin
536 # Return the generated backup
537 def onboard(self, seed, pin):
538 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH:
539 raise HSM2DongleError("Invalid seed given")
541 self.logger.info("Sending seed")
542 for i, b in enumerate(seed):
543 self._send_command(self.CMD.SEED, bytes([i, b]))
545 self.logger.info("Sending pin")
546 self._send_pin(pin, True)
548 self.logger.info("Sending wipe")
549 apdu_rcv = self._send_command(self.CMD.WIPE, timeout=self.ONBOARDING.TIMEOUT)
551 if apdu_rcv[1] != 2:
552 raise HSM2DongleError("Error onboarding. Got '%s'" % apdu_rcv.hex())
554 return True
556 # send PIN to device, optionally prepending its length
557 def _send_pin(self, pin, prepend_length):
558 final_pin = pin
559 if prepend_length:
560 final_pin = bytes([len(pin)]) + final_pin
562 for i in range(len(final_pin)):
563 self._send_command(self.CMD.SEND_PIN, bytes([i, final_pin[i]]))
565 # unlock the device with the PIN sent
566 def unlock(self, pin):
567 # Send the pin, then send the unlock command per se
568 self._send_pin(pin, prepend_length=False)
569 apdu_rcv = self._send_command(self.CMD.UNLOCK, bytes([0x00, 0x00]))
571 # Zero indicates wrong pin. Nonzero indicates device unlocked
572 return apdu_rcv[2] != 0
574 # replace PIN with a new one
575 def new_pin(self, pin):
576 try:
577 # Send the pin, then send the replace command per se
578 self._send_pin(pin, prepend_length=True)
579 self._send_command(self.CMD.CHANGE_PIN)
580 # All is good
581 return True
582 except HSM2DongleErrorResult as e:
583 # Tried to set an invalid pin
584 if e.error_code == self.ERR.UI.INVALID_PIN:
585 return False
586 # Something else happened
587 raise e
589 # returns an instance of HSM2FirmwareVersion representing
590 # the version of the currently running firmware on the HSM2
591 # that is connected (i.e., could be either the signer or ui)
592 def get_version(self):
593 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD)
594 return HSM2FirmwareVersion(apdu_rcv[2], apdu_rcv[3], apdu_rcv[4])
596 # returns the number of pin retries available
597 def get_retries(self):
598 apdu_rcv = self._send_command(self.CMD.RETRIES)
599 return apdu_rcv[2]
601 # returns an instance of HSM2FirmwareParameters representing
602 # the parameters of the currently running firmware on the HSM2
603 # that is connected (it should be running the signer).
604 def get_signer_parameters(self):
605 try:
606 apdu_rcv = self._send_command(self.CMD.GET_PARAMETERS)
607 return HSM2FirmwareParameters.from_dongle_format(apdu_rcv[self.OFF.DATA:])
608 except ValueError as e:
609 msg = "While getting signer firmware parameters: %s" % str(e)
610 self.logger.error(msg)
611 raise HSM2DongleError(msg)
613 # exit the ledger nano S menu
614 def exit_menu(self, autoexec=True):
615 self._send_command(
616 self.CMD.EXIT_MENU if autoexec else self.CMD.EXIT_MENU_NO_AUTOEXEC,
617 bytes([0x00, 0x00]),
618 )
620 # exit the current app
621 # could either be the UI bootloader, UI heartbeat or Signer
622 def exit_app(self):
623 self._send_command(self.CMD.EXIT_MENU)
625 # get the public key for a bip32 path
626 # key_id: BIP32Path
627 def get_public_key(self, key_id):
628 publicKey = self._send_command(self.CMD.GET_PUBLIC_KEY, key_id.to_binary())
629 return publicKey.hex()
631 # Ask the device to sign a specific input of a given unsigned bitcoin transaction
632 # using the given RSK transaction receipt as an authorization for the signature.
633 # key_id: BIP32Path
634 # rsk_tx_receipt: hex string
635 # btc_tx: hex string
636 # receipt_merkle_proof: list
637 # input_index: int
638 # sighash_computation_mode: a SighashComputationMode instance
639 # witness_script: hex string
640 # outpoint_value: int
641 def sign_authorized(
642 self, key_id, rsk_tx_receipt, receipt_merkle_proof, btc_tx, input_index,
643 sighash_computation_mode, witness_script, outpoint_value
644 ):
645 # *** Signing protocol ***
646 # The order in which things are required and then sent is:
647 # 1. BIP32 path & BTC tx input index (single message)
648 # 2. BTC transaction (several messages, as required by ledger)
649 # 3. RSK transaction receipt (several messages, as required by ledger)
650 # 4. RSK Tx receipt merkle proof (several messages, as required by ledger)
651 # (Note: in theory, one could depend only on the order in which
652 # the ledger requires data and send it blindly. In practice,
653 # we know exactly the order in which the device will require
654 # the data, and use that order to validate it as we go.)
655 #
656 # During these exchanges, an exception can be raised at any moment, which
657 # would signal failure signing.
658 # Specific error codes come with HSM2DongleErrorResult
659 # exception instances and are handled accordingly. Anything else
660 # is treated as an unexpected error and is let for the calling layer
661 # to handle.
663 # Step 1. Send path and input index
664 key_id_bytes = key_id.to_binary()
665 input_index_bytes = input_index.to_bytes(4, byteorder="little", signed=False)
666 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + input_index_bytes
667 try:
668 self.logger.debug("Sign: sending path - %s", data.hex())
669 response = self._send_command(self.CMD.SIGN, data)
671 # We expect the device to ask for the BTC tx next.
672 # If this doesn't happen, error out
673 if response[self.OFF.OP] != self.OP.SIGN.BTC_TX:
674 self.logger.error("Sign: unexpected response %s", response.hex())
675 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
677 # How many bytes to send in the next message
678 bytes_requested = response[self.OFF.DATA]
679 except HSM2DongleErrorResult as e:
680 self.logger.error("Sign returned: %s", hex(e.error_code))
681 if e.error_code in [
682 self.ERR.SIGN.DATA_SIZE,
683 self.ERR.SIGN.DATA_SIZE_AUTH,
684 self.ERR.SIGN.DATA_SIZE_NOAUTH,
685 ]:
686 return (False, self.RESPONSE.SIGN.ERROR_PATH)
687 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
689 # Step 2. Send BTC transaction and extra data
690 # Prefix the BTC transaction with the total length of the payload encoded as a
691 # 4 bytes little endian unsigned integer. The total length should include
692 # those 4 bytes plus 2 bytes for the length of the extradata and 1 byte for
693 # the sighash computation mode
694 try:
695 PAYLOADLENGTH_LENGTH = 4
696 SIGHASH_COMPUTATION_MODE_LENGTH = 1
697 EXTRADATALENGTH_LENGTH = 2
698 OUTPOINT_VALUE_LENGTH = 8
700 btc_tx_bytes = bytes.fromhex(btc_tx)
702 scm_bytes = sighash_computation_mode.netvalue.to_bytes(
703 SIGHASH_COMPUTATION_MODE_LENGTH,
704 byteorder='little', signed=False
705 )
707 ed_bytes = b""
709 if sighash_computation_mode == SighashComputationMode.SEGWIT:
710 ov_bytes = outpoint_value.to_bytes(
711 OUTPOINT_VALUE_LENGTH,
712 byteorder='little', signed=False
713 )
715 ws_bytes = bytes.fromhex(witness_script)
716 ws_length_bytes = bytes.fromhex(encode_varint(len(ws_bytes)))
718 ed_bytes = ws_length_bytes + ws_bytes + ov_bytes
720 edl_bytes = len(ed_bytes).to_bytes(
721 EXTRADATALENGTH_LENGTH,
722 byteorder='little', signed=False
723 )
725 payload_length = \
726 PAYLOADLENGTH_LENGTH + \
727 SIGHASH_COMPUTATION_MODE_LENGTH + \
728 EXTRADATALENGTH_LENGTH + \
729 len(btc_tx_bytes)
731 payload_length_bytes = payload_length.to_bytes(
732 PAYLOADLENGTH_LENGTH, byteorder="little", signed=False
733 )
735 data = payload_length_bytes + scm_bytes + edl_bytes + btc_tx_bytes + ed_bytes
737 response = self._send_data_in_chunks(
738 command=self.CMD.SIGN,
739 operation=self.OP.SIGN.BTC_TX,
740 next_operations=[self.OP.SIGN.TX_RECEIPT],
741 data=data,
742 expect_full_data=True,
743 initial_bytes=bytes_requested,
744 operation_name="sign",
745 data_description="BTC tx",
746 )
748 if not response[0]:
749 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
751 bytes_requested = response[1][self.OFF.DATA]
752 except HSM2DongleErrorResult as e:
753 self.logger.error("Sign returned: %s", hex(e.error_code))
754 if e.error_code in [
755 self.ERR.SIGN.INPUT,
756 self.ERR.SIGN.DATA_SIZE,
757 self.ERR.SIGN.TX_HASH_MISMATCH,
758 self.ERR.SIGN.TX_VERSION,
759 self.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE,
760 self.ERR.SIGN.INVALID_EXTRADATA_SIZE,
761 ]:
762 return (False, self.RESPONSE.SIGN.ERROR_BTC_TX)
763 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
765 # Step 3. Send transaction receipt
766 try:
767 response = self._send_data_in_chunks(
768 command=self.CMD.SIGN,
769 operation=self.OP.SIGN.TX_RECEIPT,
770 next_operations=[self.OP.SIGN.MERKLE_PROOF],
771 data=bytes.fromhex(rsk_tx_receipt),
772 expect_full_data=True,
773 initial_bytes=bytes_requested,
774 operation_name="sign",
775 data_description="tx receipt",
776 )
778 if not response[0]:
779 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
781 bytes_requested = response[1][self.OFF.DATA]
782 except HSM2DongleErrorResult as e:
783 self.logger.error("Sign returned: %s", hex(e.error_code))
784 if e.error_code in [
785 self.ERR.SIGN.STATE,
786 self.ERR.SIGN.RLP,
787 self.ERR.SIGN.RLP_INT,
788 self.ERR.SIGN.RLP_DEPTH,
789 self.ERR.SIGN.DATA_SIZE,
790 ]:
791 return (False, self.RESPONSE.SIGN.ERROR_TX_RECEIPT)
792 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
794 # Step 4. Send tx receipt merkle proof
795 # The format for the receipts merkle proof is as follows:
796 # 1 byte for the number of nodes
797 # For each node: 1 byte for the node length + the node bytes.
798 try:
799 if len(receipt_merkle_proof) > 255:
800 raise ValueError("Too many nodes")
802 merkle_proof_bytes = bytes([len(receipt_merkle_proof)])
803 for node in receipt_merkle_proof:
804 node_bytes = bytes.fromhex(node)
805 if len(node_bytes) > 255:
806 raise ValueError("Node too big: %s" % node)
807 merkle_proof_bytes = (
808 merkle_proof_bytes + bytes([len(node_bytes)]) + node_bytes
809 )
810 except ValueError as e:
811 self.logger.error("Sign: invalid receipts merkle proof: %s", str(e))
812 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF)
814 try:
815 response = self._send_data_in_chunks(
816 command=self.CMD.SIGN,
817 operation=self.OP.SIGN.MERKLE_PROOF,
818 next_operations=[self.OP.SIGN.SUCCESS],
819 data=merkle_proof_bytes,
820 expect_full_data=True,
821 initial_bytes=bytes_requested,
822 operation_name="sign",
823 data_description="receipts merkle proof",
824 )
826 if not response[0]:
827 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
828 except HSM2DongleErrorResult as e:
829 self.logger.error("Sign returned: %s", hex(e.error_code))
830 if e.error_code in [
831 self.ERR.SIGN.DATA_SIZE,
832 self.ERR.SIGN.STATE,
833 self.ERR.SIGN.NODE_VERSION,
834 self.ERR.SIGN.SHARED_PREFIX_TOO_BIG,
835 self.ERR.SIGN.RECEIPT_HASH_MISMATCH,
836 self.ERR.SIGN.NODE_CHAINING_MISMATCH,
837 self.ERR.SIGN.RECEIPT_ROOT_MISMATCH,
838 ]:
839 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF)
840 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
842 # If we get here, we should have a signature in the data part.
843 # Return success along with it.
844 try:
845 return (True, HSM2DongleSignature(response[1][self.OFF.DATA:]))
846 except Exception as e:
847 self.logger.error("Error parsing signature: %s", str(e))
848 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
850 # Ask the device to sign a specific hash without any authorization.
851 # key_id: BIP32Path
852 # hash: hex string
853 def sign_unauthorized(self, key_id, hash):
854 # *** Signing protocol ***
855 # This signing method requires a single message that contains the
856 # BIP32 path & hash to sign.
857 #
858 # An exception can be raised, which
859 # would signal failure signing. Specific error codes
860 # come with HSM2DongleErrorResult
861 # exception instances and are handled accordingly. Anything else
862 # is treated as an unexpected error and is let for the calling layer
863 # to handle.
865 try:
866 hash_bytes = bytes.fromhex(hash)
867 except ValueError:
868 self.logger.error("Sign: invalid hash - %s", hash)
869 return (False, self.RESPONSE.SIGN.ERROR_HASH)
871 # Send path and hash to sign
872 try:
873 key_id_bytes = key_id.to_binary()
874 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + hash_bytes
875 self.logger.debug("Sign: sending path and hash - %s", data.hex())
876 response = self._send_command(self.CMD.SIGN, data)
878 # Special case: if the device asks for a BTC transaction, then
879 # there's a case of both invalid path and invalid hash. Report invalid hash
880 if response[self.OFF.OP] == self.OP.SIGN.BTC_TX:
881 return (False, self.RESPONSE.SIGN.ERROR_HASH)
883 # We expect the device to report success signing
884 # If this doesn't happen, error out
885 if response[self.OFF.OP] != self.OP.SIGN.SUCCESS:
886 self.logger.error("Sign: unexpected response %s", response.hex())
887 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
888 except HSM2DongleErrorResult as e:
889 self.logger.error("Sign returned: %s", hex(e.error_code))
890 if e.error_code in [self.ERR.SIGN.DATA_SIZE, self.ERR.SIGN.DATA_SIZE_NOAUTH]:
891 return (False, self.RESPONSE.SIGN.ERROR_HASH)
892 elif e.error_code in [
893 self.ERR.SIGN.INVALID_PATH,
894 self.ERR.SIGN.DATA_SIZE_AUTH,
895 ]:
896 return (False, self.RESPONSE.SIGN.ERROR_PATH)
897 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
899 # If we get here, we should have a signature in the data part.
900 # Return success along with it.
901 try:
902 return (True, HSM2DongleSignature(response[self.OFF.DATA:]))
903 except Exception as e:
904 self.logger.error("Error parsing signature: %s", str(e))
905 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
907 def get_blockchain_state(self):
908 state = {}
910 # Get hashes
911 for (key, hash_cmd) in self.GST.HASH_VALUES.items():
912 self.logger.info("Getting hash value for '%s'", key)
913 result = self._send_command(
914 self.CMD.GET_STATE, bytes([self.OP.GST.HASH, hash_cmd])
915 )
917 # Validate result
918 if (
919 result[self.OFF.OP] != self.OP.GST.HASH
920 or result[self.OFF.DATA] != hash_cmd
921 or len(result[self.OFF.DATA + 1:]) != self.HASH_SIZE
922 ):
923 msg = "Invalid response for hash: %s" % result.hex()
924 self.logger.error(msg)
925 raise HSM2DongleError(msg)
927 state[key] = result[self.OFF.DATA + 1:].hex()
929 # Get difficulty
930 self.logger.info("Getting difficulty")
931 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.DIFF]))
932 if result[self.OFF.OP] != self.OP.GST.DIFF:
933 msg = "Invalid response for difficulty: %s" % result.hex()
934 self.logger.error(msg)
935 raise HSM2DongleError(msg)
937 state["updating.total_difficulty"] = int.from_bytes(
938 result[self.OFF.DATA:], byteorder="big", signed=False
939 )
941 # Get flags
942 self.logger.info("Getting flags")
943 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.FLAGS]))
944 if result[self.OFF.OP] != self.OP.GST.FLAGS or len(result[self.OFF.DATA:]) != 3:
945 msg = "Invalid response for flags: %s" % result.hex()
946 self.logger.error(msg)
947 raise HSM2DongleError(msg)
949 state["updating.in_progress"] = bool(
950 result[self.OFF.DATA + self.GST.FLAG_OFFSET.IN_PROGRESS]
951 )
952 state["updating.already_validated"] = bool(
953 result[self.OFF.DATA + self.GST.FLAG_OFFSET.ALREADY_VALIDATED]
954 )
955 state["updating.found_best_block"] = bool(
956 result[self.OFF.DATA + self.GST.FLAG_OFFSET.FOUND_BEST_BLOCK]
957 )
959 return state
961 def reset_advance_blockchain(self):
962 self.logger.info("Resetting advance blockchain")
963 result = self._send_command(self.CMD.RESET_AB, bytes([self.OP.RAV.INIT]))
964 if result[self.OFF.OP] != self.OP.RAV.DONE:
965 msg = "Invalid response for reset advance blockchain: %s" % result.hex()
966 self.logger.error(msg)
967 raise HSM2DongleError(msg)
969 return True
971 # Ask the device to update its blockchain references by processing
972 # a given set of blocks and their brothers.
973 # blocks: list of hex strings
974 # (each hex string is a raw block header,
975 # which should *always* include merge mining fields)
976 # brothers: list of list of hex strings
977 # (each list of hex strings is the block's brothers' headers
978 # for the corresponding block header in the same position
979 # of the blocks list)
980 def advance_blockchain(self, blocks, brothers):
981 # Convenient shorthands
982 err = self.ERR.ADVANCE
983 response = self.RESPONSE.ADVANCE
985 # Sort each group of brothers by block hash
986 brothers = list(map(lambda brolist:
987 sorted(brolist,
988 key=lambda bh: bytes.fromhex(get_block_hash(bh))
989 ),
990 brothers)
991 )
993 return self._do_block_operation(
994 "advance",
995 blocks,
996 brothers,
997 self.CMD.ADVANCE,
998 self.OP.ADVANCE,
999 err,
1000 response,
1001 {
1002 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK,
1003 err.MERKLE_PROOF_OVERFLOW: response.ERROR_INVALID_BLOCK,
1004 err.CB_TXN_OVERFLOW: response.ERROR_INVALID_BLOCK,
1005 err.RLP_INVALID: response.ERROR_INVALID_BLOCK,
1006 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK,
1007 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK,
1008 err.UMM_ROOT_INVALID: response.ERROR_INVALID_BLOCK,
1009 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK,
1010 err.MERKLE_PROOF_INVALID: response.ERROR_INVALID_BLOCK,
1011 err.BLOCK_DIFF_INVALID: response.ERROR_INVALID_BLOCK,
1012 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK,
1013 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK,
1014 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK,
1015 err.MERKLE_PROOF_MISMATCH: response.ERROR_POW_INVALID,
1016 err.BTC_CB_TXN_INVALID: response.ERROR_POW_INVALID,
1017 err.MM_HASH_MISMATCH: response.ERROR_POW_INVALID,
1018 err.BTC_DIFF_MISMATCH: response.ERROR_POW_INVALID,
1019 err.CB_TXN_HASH_MISMATCH: response.ERROR_POW_INVALID,
1020 err.BROTHERS_TOO_MANY: response.ERROR_INVALID_BROTHERS,
1021 err.BROTHER_PARENT_MISMATCH: response.ERROR_INVALID_BROTHERS,
1022 err.BROTHER_SAME_AS_BLOCK: response.ERROR_INVALID_BROTHERS,
1023 err.BROTHER_ORDER_INVALID: response.ERROR_INVALID_BROTHERS,
1024 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH,
1025 err.TOTAL_DIFF_OVERFLOW: response.ERROR_UNSUPPORTED_CHAIN,
1026 err.PROT_INVALID: response.ERROR_BLOCK_DATA,
1027 },
1028 )
1030 # Ask the device to update its ancestor block and ancestor receipts root
1031 # references by processing a given set of blocks.
1032 # blocks: list of hex strings
1033 # (each hex string is a raw block header,
1034 # which doesn't need to include merge mining fields -
1035 # those will be stripped for efficiency before being sent
1036 # to the device anyway)
1037 def update_ancestor(self, blocks):
1038 # Convenient shorthands
1039 err = self.ERR.UPD_ANCESTOR
1040 response = self.RESPONSE.UPD_ANCESTOR
1042 # Optimization: remove merge mining fields (if present) from blocks
1043 try:
1044 self.logger.info("Removing merge mining fields from %d blocks", len(blocks))
1045 optimized_blocks = list(map(remove_mm_fields_if_present, blocks))
1046 except ValueError as e:
1047 self.logger.error("While removing merge mining fields: %s", str(e))
1048 return (False, response.ERROR_REMOVE_MM_FIELDS)
1050 return self._do_block_operation(
1051 "updancestor",
1052 optimized_blocks,
1053 None,
1054 self.CMD.UPD_ANCESTOR,
1055 self.OP.UPD_ANCESTOR,
1056 err,
1057 response,
1058 {
1059 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK,
1060 err.RLP_INVALID: response.ERROR_INVALID_BLOCK,
1061 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK,
1062 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK,
1063 err.RECEIPT_ROOT_INVALID: response.ERROR_INVALID_BLOCK,
1064 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK,
1065 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK,
1066 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK,
1067 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK,
1068 err.ANCESTOR_TIP_MISMATCH: response.ERROR_TIP_MISMATCH,
1069 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH,
1070 err.PROT_INVALID: response.ERROR_BLOCK_DATA,
1071 },
1072 )
1074 def get_ui_attestation(self, ud_value_hex):
1075 # Parse hexadecimal values
1076 ud_value = bytes.fromhex(ud_value_hex)
1078 # Get UI hash
1079 ui_hash = self._send_command(
1080 self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_APP_HASH])
1081 )[self.OFF.DATA:]
1083 # Send UD value
1084 data = bytes([self.OP.UI_ATT.OP_UD_VALUE]) + ud_value
1085 self._send_command(self.CMD.UI_ATT, data)
1087 # Retrieve message
1088 page = 0
1089 message = b""
1090 while True:
1091 if page == self.MAX_PAGES_UI_ATT_MESSAGE:
1092 msg = (
1093 "Maximum number of UI attestation pages exceeded ()"
1094 % self.MAX_PAGES_UI_ATT_MESSAGE
1095 )
1096 self.logger.error(msg)
1097 raise HSM2DongleError(msg)
1098 data = bytes([self.OP.UI_ATT.OP_GET_MSG, page])
1099 response = self._send_command(self.CMD.UI_ATT, data)
1100 page += 1
1101 message += response[self.OFF.DATA + 1:]
1102 if response[self.OFF.DATA] == 0:
1103 break
1105 # Retrieve attestation
1106 attestation = self._send_command(self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_GET]))[self.OFF.DATA:] # noqa E501
1108 return {
1109 "app_hash": ui_hash.hex(),
1110 "message": message.hex(),
1111 "signature": attestation.hex(),
1112 }
1114 def get_signer_attestation(self):
1115 # Get signer hash
1116 signer_hash = self._send_command(
1117 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_APP_HASH])
1118 )[self.OFF.DATA:]
1120 # Retrieve attestation
1121 attestation = self._send_command(
1122 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_GET])
1123 )[self.OFF.DATA:]
1125 # Retrieve message
1126 message = self._send_command(
1127 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_GET_MESSAGE])
1128 )[self.OFF.DATA:]
1130 return {
1131 "app_hash": signer_hash.hex(),
1132 "message": message.hex(),
1133 "signature": attestation.hex(),
1134 }
1136 def get_signer_heartbeat(self, ud_value):
1137 return HSM2SignerHeartbeat(self).run(ud_value)
1139 def get_ui_heartbeat(self, ud_value):
1140 return HSM2UIHeartbeat(self).run(ud_value)
1142 def authorize_signer(self, signer_authorization):
1143 # Send signer version
1144 self._send_command(self.CMD.SIGNER_AUTH,
1145 bytes([self.OP.SIGNER_AUTH.OP_SIGVER]) +
1146 bytes.fromhex(signer_authorization.signer_version.hash) +
1147 signer_authorization.signer_version.iteration.to_bytes(
1148 self.SIGNER_AUTH_ITERATION_SIZE,
1149 byteorder='big', signed=False))
1151 # Send signatures one by one
1152 result = None
1153 for signature in signer_authorization.signatures:
1154 result = self._send_command(self.CMD.SIGNER_AUTH,
1155 bytes([self.OP.SIGNER_AUTH.OP_SIGN]) +
1156 bytes.fromhex(signature))[self.OFF.DATA]
1157 # Are we done?
1158 if result == self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS:
1159 return True
1161 # Are we not done after all signatures were sent?
1162 if result != self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS:
1163 raise HSM2DongleError("Not enough signatures given. "
1164 "Signer authorization failed")
1166 return True
1168 # Used both for advance blockchain and update ancestor given the protocol
1169 # is very similar
1170 def _do_block_operation(
1171 self,
1172 operation_name,
1173 blocks,
1174 brothers,
1175 command,
1176 ops,
1177 errors,
1178 responses,
1179 chunk_error_mapping,
1180 ):
1181 # *** Block operation protocol ***
1182 # The order in which things are required and then sent is:
1183 # 1. Initialization, where the total number of blocks to send is sent.
1184 # 2. For each block header:
1185 # 2.1. Block metadata (single message):
1186 # - MM payload size in bytes
1187 # (see the block_utils.rlp_mm_payload_size method for details on this)
1188 # - In case of an advance blockchain operation,
1189 # coinbase transaction hash (see the block_utils.coinbase_tx_get_hash
1190 # for details on this)
1191 # 2.2. Block chunks: block header pieces as requested by the ledger.
1192 # 2.3. Brothers -- only for advance blockchain:
1193 # 2.3.1 Brothers metadata (single message):
1194 # - Brother count
1195 # 2.3.2 For each brother (if brother count was greater than zero):
1196 # 2.3.2.1. Brother metadata (single message):
1197 # - MM payload size in bytes
1198 # (see the block_utils.rlp_mm_payload_size method for details on this)
1199 # - Coinbase transaction hash (see the block_utils.coinbase_tx_get_hash
1200 # for details on this)
1201 # 2.3.2.2. Brother chunks: brother header pieces as requested by the ledger.
1202 #
1203 # During these exchanges, an exception can be raised at any moment, which
1204 # would signal failure. Specific error codes come with HSM2DongleErrorResult
1205 # exception instances and are handled accordingly. Anything else
1206 # is treated as an unexpected error and is let for the calling layer
1207 # to handle.
1209 # Step 1. Send initialization
1210 num_blocks_bytes = len(blocks).to_bytes(4, byteorder="big", signed=False)
1211 data = bytes([ops.INIT]) + num_blocks_bytes
1212 try:
1213 self.logger.info(
1214 "%s: sending initialization - %s", operation_name.capitalize(), data.hex()
1215 )
1216 response = self._send_command(command, data)
1218 # We expect the device to ask for block metadata next.
1219 # If this doesn't happen, error out
1220 if response[self.OFF.OP] != ops.HEADER_META:
1221 self.logger.error(
1222 "%s: unexpected response %s",
1223 operation_name.capitalize(),
1224 response.hex(),
1225 )
1226 return (False, responses.ERROR_UNEXPECTED)
1227 except HSM2DongleErrorResult as e:
1228 self.logger.error(
1229 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1230 )
1231 if e.error_code in [errors.PROT_INVALID]:
1232 return (False, responses.ERROR_INIT)
1233 return (False, responses.ERROR_UNEXPECTED)
1235 # Step 2. Send blocks (and brothers, if any)
1236 total_blocks = len(blocks)
1237 for block_number, block in enumerate(blocks, 1):
1238 self.logger.info(
1239 "%s: sending block #%d/%d",
1240 operation_name.capitalize(),
1241 block_number,
1242 total_blocks,
1243 )
1245 response = self._send_block_header(
1246 operation_name=operation_name,
1247 header_name="block",
1248 block=block,
1249 command=command,
1250 ops=ops,
1251 op_meta=ops.HEADER_META,
1252 op_chunk=ops.HEADER_CHUNK,
1253 responses=responses,
1254 errors=errors,
1255 chunk_error_mapping=chunk_error_mapping
1256 )
1257 if not response[0]:
1258 return response
1260 # Step 2.3. Send brothers
1261 # *** Only for advance blockchain and if requested by the dongle ***
1262 if command == self.CMD.ADVANCE and \
1263 response[1][self.OFF.OP] == ops.BROTHER_LIST_META:
1265 # Step 2.3.1. Send brother list metadata
1266 brother_list = brothers[block_number-1]
1267 brother_count = len(brother_list)
1268 brother_count_bytes = brother_count.to_bytes(1,
1269 byteorder="big",
1270 signed=False)
1271 data = bytes([ops.BROTHER_LIST_META]) + brother_count_bytes
1272 try:
1273 self.logger.info(
1274 "%s: sending brother list metadata - %s",
1275 operation_name.capitalize(), data.hex()
1276 )
1277 response = [None, self._send_command(command, data)]
1279 # If we have at least one brother,
1280 # we expect the device to ask for brother metadata next.
1281 # If this doesn't happen, error out
1282 if brother_count > 0 and response[1][self.OFF.OP] != ops.BROTHER_META:
1283 self.logger.error(
1284 "%s: unexpected response %s",
1285 operation_name.capitalize(),
1286 response[1].hex(),
1287 )
1288 return (False, responses.ERROR_UNEXPECTED)
1289 except HSM2DongleErrorResult as e:
1290 self.logger.error(
1291 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1292 )
1293 if e.error_code in [errors.PROT_INVALID, errors.BROTHERS_TOO_MANY]:
1294 return (False, responses.ERROR_INVALID_BROTHERS)
1295 return (False, responses.ERROR_UNEXPECTED)
1297 # Step 2.3.2. Send each brother
1298 for brother_number, brother in enumerate(brother_list, 1):
1299 self.logger.info(
1300 "%s: sending brother #%d/%d",
1301 operation_name.capitalize(),
1302 brother_number,
1303 brother_count,
1304 )
1306 response = self._send_block_header(
1307 operation_name=operation_name,
1308 header_name="brother",
1309 block=brother,
1310 command=command,
1311 ops=ops,
1312 op_meta=ops.BROTHER_META,
1313 op_chunk=ops.BROTHER_CHUNK,
1314 responses=responses,
1315 errors=errors,
1316 chunk_error_mapping=chunk_error_mapping
1317 )
1318 if not response[0]:
1319 return response
1321 # Partial success?
1322 if command == self.CMD.ADVANCE and response[1][self.OFF.OP] == ops.PARTIAL:
1323 self.logger.info("%s: partial success", operation_name.capitalize())
1324 return (True, responses.OK_PARTIAL)
1326 # Success?
1327 if response[1][self.OFF.OP] == ops.SUCCESS:
1328 self.logger.info("%s: total success", operation_name.capitalize())
1329 return (True, responses.OK_TOTAL)
1331 # We shouldn't be able to ever reach this point
1332 msg = "%s: unexpected state" % operation_name.capitalize()
1333 self.logger.fatal(msg)
1334 raise HSM2DongleError(msg)
1336 # Send an individual block header to the device, including computing
1337 # and sending metadata
1338 # This is used both for advance blockchain (block and brother headers)
1339 # and update ancestor given the protocol is very similar
1340 def _send_block_header(
1341 self,
1342 operation_name,
1343 header_name,
1344 block,
1345 command,
1346 ops,
1347 op_meta,
1348 op_chunk,
1349 responses,
1350 errors,
1351 chunk_error_mapping
1352 ):
1353 # A. Compute and send block metadata
1354 # (this will also validate that the block is a valid RLP-encoded list
1355 # of the proper size)
1356 try:
1357 # RLP payload size for merge mining hash
1358 mm_payload_size = rlp_mm_payload_size(block)
1359 self.logger.debug(
1360 "%s metadata: MM payload length %d",
1361 header_name.capitalize(),
1362 mm_payload_size)
1363 mm_payload_size_bytes = mm_payload_size.to_bytes(
1364 2, byteorder="big", signed=False
1365 )
1366 # Coinbase transaction hash
1367 cb_txn_hash = bytes([])
1368 if command == self.CMD.ADVANCE:
1369 cb_txn_hash = bytes.fromhex(
1370 coinbase_tx_get_hash(get_coinbase_txn(block))
1371 )
1372 self.logger.debug(
1373 "%s Metadata: CB txn hash: %s",
1374 header_name.capitalize(),
1375 cb_txn_hash.hex())
1376 # Wrap and send
1377 data = bytes([op_meta]) + mm_payload_size_bytes + cb_txn_hash
1378 self.logger.info(
1379 "%s: sending %s metadata - %s",
1380 operation_name.capitalize(),
1381 header_name,
1382 data.hex()
1383 )
1384 response = self._send_command(command, data)
1386 # We expect the device to ask for a block chunk next.
1387 # If this doesn't happen, error out
1388 if response[self.OFF.OP] != op_chunk:
1389 self.logger.error(
1390 "%s: unexpected response %s",
1391 operation_name.capitalize(),
1392 response.hex(),
1393 )
1394 return (False, responses.ERROR_UNEXPECTED)
1396 # How many bytes to send as the first block chunk
1397 bytes_requested = response[self.OFF.DATA]
1398 except ValueError as e:
1399 self.logger.error("Computing %s metadata: %s", header_name, str(e))
1400 return (False, responses.ERROR_COMPUTE_METADATA)
1401 except HSM2DongleErrorResult as e:
1402 self.logger.error(
1403 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1404 )
1405 if e.error_code in [errors.PROT_INVALID]:
1406 return (False, responses.ERROR_METADATA)
1407 return (False, responses.ERROR_UNEXPECTED)
1409 # B. Send block data in chunks
1410 try:
1411 # Next possible operations depending on the specific command
1412 # and type of header we're sending
1413 next_operations = [op_chunk, op_meta, ops.SUCCESS]
1414 if command == self.CMD.ADVANCE:
1415 next_operations.append(ops.PARTIAL)
1416 if header_name == "block":
1417 next_operations.append(ops.BROTHER_LIST_META)
1418 if header_name == "brother":
1419 next_operations.append(ops.HEADER_META)
1421 response = self._send_data_in_chunks(
1422 command=command,
1423 operation=op_chunk,
1424 next_operations=next_operations,
1425 data=bytes.fromhex(block),
1426 expect_full_data=False,
1427 initial_bytes=bytes_requested,
1428 operation_name=operation_name,
1429 data_description=header_name,
1430 )
1432 if not response[0]:
1433 return (False, responses.ERROR_UNEXPECTED)
1434 except HSM2DongleErrorResult as e:
1435 self.logger.error(
1436 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1437 )
1438 return (
1439 False,
1440 chunk_error_mapping.get(e.error_code, responses.ERROR_UNEXPECTED),
1441 )
1443 return response
1445 # Send a specific piece of data in chunks to the device
1446 # as the device requests bytes from it.
1447 # Validate responses wrt current operation and next possible expected operations
1448 # Exceptions are to be handled by the caller
1449 def _send_data_in_chunks(
1450 self,
1451 command,
1452 operation,
1453 next_operations,
1454 data,
1455 expect_full_data,
1456 initial_bytes,
1457 operation_name,
1458 data_description,
1459 ):
1460 offset = 0
1461 bytes_requested = initial_bytes
1462 total_bytes_sent = 0
1463 finished = False
1464 while not finished:
1465 to_send = data[offset:offset + bytes_requested]
1466 to_send_length = len(to_send)
1467 self.logger.debug(
1468 "%s: sending %s chunk [%d:%d] - %s",
1469 operation_name.capitalize(),
1470 data_description,
1471 offset,
1472 offset + to_send_length,
1473 to_send.hex(),
1474 )
1475 response = self._send_command(command, bytes([operation]) + to_send)
1477 # Increase count and buffer pointer
1478 total_bytes_sent += to_send_length
1479 offset += to_send_length
1481 # We expect the device to either ask for the current or for the
1482 # next operation.
1483 # If none of this happens, error out
1484 if response[self.OFF.OP] not in ([operation] + next_operations):
1485 self.logger.debug(
1486 "Current operation %s, next operations %s, ledger requesting %s",
1487 hex(operation),
1488 str(list(map(hex, next_operations))),
1489 hex(response[2]),
1490 )
1491 self.logger.error(
1492 "%s: unexpected response %s",
1493 operation_name.capitalize(),
1494 response.hex(),
1495 )
1496 return (False, response)
1498 # We finish when the device requests the next piece of data
1499 finished = response[self.OFF.OP] != operation
1501 # Have we finished but not sent all data when required to do so?
1502 if expect_full_data and finished and total_bytes_sent < len(data):
1503 self.logger.error(
1504 "%s: expected to send all %d data bytes but sent %d and got %s",
1505 operation_name.capitalize(),
1506 len(data),
1507 total_bytes_sent,
1508 response.hex()
1509 )
1510 return (False, response)
1512 # How many bytes to send in the next message
1513 if not finished:
1514 bytes_requested = response[self.OFF.DATA]
1515 self.logger.debug("Dongle requested %d bytes", bytes_requested)
1517 # All is good
1518 return (True, response)