Coverage for ledger/hsm2dongle.py: 90%
684 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import struct
24from enum import IntEnum, Enum, auto
25from ledgerblue.comm import getDongle
26from ledgerblue.commException import CommException
27import hid
28from .signature import HSM2DongleSignature
29from .version import HSM2FirmwareVersion
30from .parameters import HSM2FirmwareParameters
31from .hsm2dongle_cmds import HSM2SignerHeartbeat, HSM2UIHeartbeat, PowHsmAttestation
32from .block_utils import (
33 rlp_mm_payload_size,
34 remove_mm_fields_if_present,
35 get_coinbase_txn,
36 get_block_hash,
37)
38from comm.bitcoin import encode_varint
39from comm.pow import coinbase_tx_get_hash
40import logging
42# Enumerations
44# Dongle commands
47class _Command(IntEnum):
48 IS_ONBOARD = 0x06
49 ECHO = 0x02 # UI command
50 SIGN = 0x02 # Signer command
51 GET_PUBLIC_KEY = 0x04
52 SEND_PIN = 0x41
53 UNLOCK = 0xFE
54 CHANGE_PIN = 0x08
55 GET_MODE = 0x43
56 EXIT_MENU = 0xFF
57 EXIT_MENU_NO_AUTOEXEC = 0xFA
58 GET_STATE = 0x20
59 RESET_AB = 0x21
60 ADVANCE = 0x10
61 UPD_ANCESTOR = 0x30
62 GET_PARAMETERS = 0x11
63 SEED = 0x44
64 WIPE = 0x07
65 UI_ATT = 0x50
66 SIGNER_AUTH = 0x51
67 RETRIES = 0x45
70# Sign command OPs
71class _SignOps(IntEnum):
72 PATH = 0x01
73 BTC_TX = 0x02
74 TX_RECEIPT = 0x04
75 MERKLE_PROOF = 0x08
76 SUCCESS = 0x81
79# Get blockchain state command OPs
80class _GetStateOps(IntEnum):
81 HASH = 0x01
82 DIFF = 0x02
83 FLAGS = 0x03
86# Reset advance blockchain command OPs
87class _ResetAdvanceOps(IntEnum):
88 INIT = 0x01
89 DONE = 0x02
92# Advance blockchain command OPs
93class _AdvanceOps(IntEnum):
94 INIT = 0x02
95 HEADER_META = 0x03
96 HEADER_CHUNK = 0x04
97 PARTIAL = 0x05
98 SUCCESS = 0x06
99 BROTHER_LIST_META = 0x07
100 BROTHER_META = 0x08
101 BROTHER_CHUNK = 0x09
104# Update ancestor command OPs
105class _UpdateAncestorOps(IntEnum):
106 INIT = 0x02
107 HEADER_META = 0x03
108 HEADER_CHUNK = 0x04
109 SUCCESS = 0x05
112# UI attestation OPs
113class _UIAttestationOps(IntEnum):
114 OP_UD_VALUE = 0x01
115 OP_GET_MSG = 0x02
116 OP_GET = 0x03
117 OP_APP_HASH = 0x04
120# Signer authorization OPs (and results)
121class _SignerAuthorizationOps(IntEnum):
122 OP_SIGVER = 0x01
123 OP_SIGN = 0x02
124 OP_SIGN_RES_MORE = 0x01
125 OP_SIGN_RES_SUCCESS = 0x02
128# Command Ops
129class _Ops:
130 SIGN = _SignOps
131 GST = _GetStateOps
132 RAV = _ResetAdvanceOps
133 ADVANCE = _AdvanceOps
134 UPD_ANCESTOR = _UpdateAncestorOps
135 UI_ATT = _UIAttestationOps
136 SIGNER_AUTH = _SignerAuthorizationOps
139# Protocol offsets
140class _Offset(IntEnum):
141 CLA = 0
142 CMD = 1
143 OP = 2
144 DATA = 3
147# Device modes
148class _Mode(IntEnum):
149 BOOTLOADER = 0x02
150 SIGNER = 0x03
151 UI_HEARTBEAT = 0x04
152 UNKNOWN = 0xFF
155# Get blockchain state flag indexes
156class _GetStateFlagOffset(IntEnum):
157 IN_PROGRESS = 0
158 ALREADY_VALIDATED = 1
159 FOUND_BEST_BLOCK = 2
162# Get blockchain state constants
163class _GetState:
164 FLAG_OFFSET = _GetStateFlagOffset
165 HASH_VALUES = {
166 "best_block": 0x01,
167 "newest_valid_block": 0x02,
168 "ancestor_block": 0x03,
169 "ancestor_receipts_root": 0x05,
170 "updating.best_block": 0x81,
171 "updating.newest_valid_block": 0x82,
172 "updating.next_expected_block": 0x84,
173 }
176# Sign command errors
177class _SignError(IntEnum):
178 DATA_SIZE = 0x6A87
179 INPUT = auto()
180 STATE = auto()
181 RLP = auto()
182 RLP_INT = auto()
183 RLP_DEPTH = auto()
184 TX_HASH_MISMATCH = auto()
185 TX_VERSION = auto()
186 INVALID_PATH = auto()
187 DATA_SIZE_AUTH = auto()
188 DATA_SIZE_NOAUTH = auto()
189 NODE_VERSION = auto()
190 SHARED_PREFIX_TOO_BIG = auto()
191 RECEIPT_HASH_MISMATCH = auto()
192 NODE_CHAINING_MISMATCH = auto()
193 RECEIPT_ROOT_MISMATCH = auto()
194 INVALID_SIGHASH_COMPUTATION_MODE = auto()
195 INVALID_EXTRADATA_SIZE = auto()
198# Get public key command errors
199class _GetPubKeyError(IntEnum):
200 DATA_SIZE = 0x6A87
203# Advance blockchain and update ancestor command errors
204class _AdvanceUpdateError(IntEnum):
205 UNKNOWN = 0
206 PROT_INVALID = 0x6B87
207 RLP_INVALID = auto()
208 BLOCK_TOO_OLD = auto()
209 BLOCK_TOO_SHORT = auto()
210 PARENT_HASH_INVALID = auto()
211 RECEIPT_ROOT_INVALID = auto()
212 BLOCK_NUM_INVALID = auto()
213 BLOCK_DIFF_INVALID = auto()
214 UMM_ROOT_INVALID = auto()
215 BTC_HEADER_INVALID = auto()
216 MERKLE_PROOF_INVALID = auto()
217 BTC_CB_TXN_INVALID = auto()
218 MM_RLP_LEN_MISMATCH = auto()
219 BTC_DIFF_MISMATCH = auto()
220 MERKLE_PROOF_MISMATCH = auto()
221 MM_HASH_MISMATCH = auto()
222 MERKLE_PROOF_OVERFLOW = auto()
223 CB_TXN_OVERFLOW = auto()
224 BUFFER_OVERFLOW = auto()
225 CHAIN_MISMATCH = auto()
226 TOTAL_DIFF_OVERFLOW = auto()
227 ANCESTOR_TIP_MISMATCH = auto()
228 CB_TXN_HASH_MISMATCH = auto()
229 BROTHERS_TOO_MANY = auto()
230 BROTHER_PARENT_MISMATCH = auto()
231 BROTHER_SAME_AS_BLOCK = auto()
232 BROTHER_ORDER_INVALID = auto()
235class _UIError(IntEnum):
236 INVALID_PIN = 0x69A0
239class _UIAttestationError(IntEnum):
240 PROT_INVALID = 0x6A01
241 NO_ONBOARD = 0x6A02
242 INTERNAL = 0x6A99
245class _SignerAuthorizationError(IntEnum):
246 PROT_INVALID = 0x6A01
247 INVALID_ITERATION = 0x6a03
248 INVALID_SIGNATURE = 0x6a04
249 INVALID_AUTH_INVALID_INDEX = 0x6a05
252# Error codes
253class _Error:
254 SIGN = _SignError
255 GETPUBKEY = _GetPubKeyError
256 ADVANCE = _AdvanceUpdateError
257 UPD_ANCESTOR = _AdvanceUpdateError
258 UI = _UIError
259 UI_ATT = _UIAttestationError
260 SIGNER_AUTH = _SignerAuthorizationError
262 # Whether a given code is in the
263 # user-defined (RSK firmware) specific error code range
264 @staticmethod
265 def is_user_defined_error(code):
266 return (code >= 0x69A0 and code <= 0x6BFF) or code == 0x6D00
269# Sign command responses to the user
270class _SignResponse(IntEnum):
271 ERROR_PATH = -1
272 ERROR_BTC_TX = -2
273 ERROR_TX_RECEIPT = -3
274 ERROR_MERKLE_PROOF = -4
275 ERROR_HASH = -5
276 ERROR_UNEXPECTED = -10
279# Advance blockchain responses to the user
280class _AdvanceResponse(IntEnum):
281 OK_TOTAL = 1
282 OK_PARTIAL = 2
284 ERROR_INIT = -1
285 ERROR_COMPUTE_METADATA = -2
286 ERROR_METADATA = -3
287 ERROR_BLOCK_DATA = -4
288 ERROR_INVALID_BLOCK = -5
289 ERROR_POW_INVALID = -6
290 ERROR_CHAINING_MISMATCH = -7
291 ERROR_UNSUPPORTED_CHAIN = -8
292 ERROR_INVALID_BROTHERS = -9
293 ERROR_UNEXPECTED = -10
296# Update ancestor responses to the user
297class _UpdateAncestorResponse(IntEnum):
298 OK_TOTAL = 1
300 ERROR_INIT = -1
301 ERROR_COMPUTE_METADATA = -2
302 ERROR_METADATA = -3
303 ERROR_BLOCK_DATA = -4
304 ERROR_INVALID_BLOCK = -5
305 ERROR_CHAINING_MISMATCH = -6
306 ERROR_TIP_MISMATCH = -7
307 ERROR_REMOVE_MM_FIELDS = -8
308 ERROR_UNEXPECTED = -10
311# Responses
312class _Response:
313 SIGN = _SignResponse
314 ADVANCE = _AdvanceResponse
315 UPD_ANCESTOR = _UpdateAncestorResponse
318# Onboarding constants
319class _Onboarding(IntEnum):
320 SEED_LENGTH = 32
321 TIMEOUT = 10
324# Sighash computation modes
325class SighashComputationMode(Enum):
326 def __new__(cls, *args, **kwds):
327 obj = object.__new__(cls)
328 obj._value_ = args[0]
329 obj.netvalue = args[1]
330 return obj
332 LEGACY = "legacy", 0
333 SEGWIT = "segwit", 1
336class HSM2DongleBaseError(RuntimeError):
337 @property
338 def message(self):
339 if len(self.args) == 0:
340 return None
341 return self.args[0]
344class HSM2DongleError(HSM2DongleBaseError):
345 pass
348class HSM2DongleTimeoutError(HSM2DongleBaseError):
349 @staticmethod
350 def is_timeout(exc):
351 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout":
352 return True
353 return False
356class HSM2DongleCommError(HSM2DongleBaseError):
357 @staticmethod
358 def is_comm_error(exc):
359 if (
360 type(exc) == BaseException
361 and len(exc.args) == 1
362 and exc.args[0] == "Error while writing"
363 ) or (
364 type(exc) == OSError
365 and len(exc.args) == 1
366 and exc.args[0] == "read error"
367 ) or isinstance(exc, HSM2DongleCommError):
368 return True
369 return False
372class HSM2DongleErrorResult(HSM2DongleBaseError):
373 @property
374 def error_code(self):
375 return self.args[0]
377 def __str__(self):
378 return f"Dongle returned error code {hex(self.error_code)}"
381# Handles low-level communication with a powHSM dongle
382class HSM2Dongle:
383 # Ledger constants
384 HASH_SIZE = 32
386 # APDU prefix
387 CLA = 0x80
389 # Enumeration shorthands
390 OFF = _Offset
391 CMD = _Command
392 MODE = _Mode
393 OP = _Ops
394 ERR = _Error
395 GST = _GetState
396 RESPONSE = _Response
397 ONBOARDING = _Onboarding
399 # Dongle exchange timeout
400 DONGLE_TIMEOUT = 10 # seconds
402 # Maximum pages expected to conform the UI attestation message
403 MAX_PAGES_UI_ATT_MESSAGE = 4
405 # Size of the iteration parameter for the signer authorization
406 SIGNER_AUTH_ITERATION_SIZE = 2
408 # Shorthand for externally defined commands
409 ErrorResult = HSM2DongleErrorResult
411 def __init__(self, debug):
412 self.logger = logging.getLogger("dongle")
413 self.debug = debug
414 self.last_comm_exception = None
416 # Send command to device
417 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT):
418 self.last_comm_exception = None
419 try:
420 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data)
421 self.logger.debug("Sending command: 0x%s", cmd.hex())
422 result = self.dongle.exchange(cmd, timeout=timeout)
423 self.logger.debug("Received: 0x%s", result.hex())
424 except (CommException, BaseException) as e:
425 # If this is a user-defined error, raise an
426 # error result error
427 if type(e) == CommException:
428 self.last_comm_exception = e
429 error_code = e.sw
430 if _Error.is_user_defined_error(error_code):
431 self.logger.error("Received error code: %s", hex(error_code))
432 raise HSM2DongleErrorResult(error_code)
434 # If this is a dongle timeout, raise a timeout error
435 if HSM2DongleTimeoutError.is_timeout(e):
436 raise HSM2DongleTimeoutError(str(e))
438 # If this is a dongle communication problem, raise a comm error
439 if HSM2DongleCommError.is_comm_error(e):
440 raise HSM2DongleCommError(str(e))
442 # Raise a standard error, but
443 # report differently for a CommException and any other
444 # type of exception
445 if type(e) == CommException:
446 msg = "Error sending command: %s" % str(e)
447 self.logger.error(msg)
448 else:
449 msg = "Unknown error sending command: %s (of type %s)" % \
450 (str(e), type(e).__name__)
451 self.logger.critical(msg)
453 raise HSM2DongleError(msg)
455 return result
457 # Send command version to be used by command classes
458 def send_command(self, cmd, op, data, timeout=DONGLE_TIMEOUT):
459 return self._send_command(cmd, bytes([op]) + data, timeout)
461 # Connect to the dongle
462 def connect(self):
463 try:
464 self.logger.info("Connecting")
465 self.dongle = getDongle(self.debug)
466 self.logger.info("Connected")
467 except CommException as e:
468 msg = "Error connecting: %s" % e.message
469 self.logger.error(msg)
470 raise HSM2DongleCommError(msg)
472 # Disconnect from dongle
473 def disconnect(self):
474 try:
475 self.logger.info("Disconnecting")
476 if self.dongle and self.dongle.opened:
477 self.dongle.close()
478 # **** Begin hack ****
479 # When running within a docker container,
480 # the hidapi library fails to detect a physical
481 # usb device reconnection. This will "hard reset" the
482 # stack so that a potential physical device reconnection
483 # can be detected.
484 try:
485 hid.hidapi_exit()
486 except Exception:
487 # hidapi_exit() can sometimes throw. we don't care
488 pass
489 # **** End hack ****
490 self.logger.info("Disconnected")
491 except CommException as e:
492 msg = "Error disconnecting: %s" % e.message
493 self.logger.error(msg)
494 raise HSM2DongleCommError(msg)
496 # Return device mode
497 def get_current_mode(self):
498 try:
499 apdu_rcv = self._send_command(self.CMD.GET_MODE)
500 return self.MODE(apdu_rcv[1])
501 except HSM2DongleError:
502 return self.MODE.UNKNOWN
504 # Echo message
505 def echo(self):
506 message = bytes([0x41, 0x42, 0x43])
507 result = bytes(self._send_command(self.CMD.ECHO, message))
508 # Result should be the command plus the message
509 expected_result = bytes([self.CLA, self.CMD.ECHO]) + message
510 return result == expected_result
512 # Return true if the hsm2 is onboarded
513 def is_onboarded(self):
514 self.logger.info("Sending isOnboarded")
515 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD)
516 is_onboard = apdu_rcv[1] == 1
517 self.logger.info("isOnboarded: %s", "yes" if is_onboard else "no")
518 return is_onboard
520 # Attempt to onboard the device using the given seed and pin
521 # Return the generated backup
522 def onboard(self, seed, pin):
523 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH:
524 raise HSM2DongleError("Invalid seed given")
526 self.logger.info("Sending seed")
527 for i, b in enumerate(seed):
528 self._send_command(self.CMD.SEED, bytes([i, b]))
530 self.logger.info("Sending pin")
531 self._send_pin(pin, True)
533 self.logger.info("Sending wipe")
534 apdu_rcv = self._send_command(self.CMD.WIPE, timeout=self.ONBOARDING.TIMEOUT)
536 if apdu_rcv[1] != 2:
537 raise HSM2DongleError("Error onboarding. Got '%s'" % apdu_rcv.hex())
539 return True
541 # send PIN to device, optionally prepending its length
542 def _send_pin(self, pin, prepend_length):
543 final_pin = pin
544 if prepend_length:
545 final_pin = bytes([len(pin)]) + final_pin
547 for i in range(len(final_pin)):
548 self._send_command(self.CMD.SEND_PIN, bytes([i, final_pin[i]]))
550 # unlock the device with the PIN sent
551 def unlock(self, pin):
552 # Send the pin, then send the unlock command per se
553 self._send_pin(pin, prepend_length=False)
554 apdu_rcv = self._send_command(self.CMD.UNLOCK, bytes([0x00, 0x00]))
556 # Zero indicates wrong pin. Nonzero indicates device unlocked
557 return apdu_rcv[2] != 0
559 # replace PIN with a new one
560 def new_pin(self, pin):
561 try:
562 # Send the pin, then send the replace command per se
563 self._send_pin(pin, prepend_length=True)
564 self._send_command(self.CMD.CHANGE_PIN)
565 # All is good
566 return True
567 except HSM2DongleErrorResult as e:
568 # Tried to set an invalid pin
569 if e.error_code == self.ERR.UI.INVALID_PIN:
570 return False
571 # Something else happened
572 raise e
574 # returns an instance of HSM2FirmwareVersion representing
575 # the version of the currently running firmware on the HSM2
576 # that is connected (i.e., could be either the signer or ui)
577 def get_version(self):
578 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD)
579 return HSM2FirmwareVersion(apdu_rcv[2], apdu_rcv[3], apdu_rcv[4])
581 # returns the number of pin retries available
582 def get_retries(self):
583 apdu_rcv = self._send_command(self.CMD.RETRIES)
584 return apdu_rcv[2]
586 # returns an instance of HSM2FirmwareParameters representing
587 # the parameters of the currently running firmware on the HSM2
588 # that is connected (it should be running the signer).
589 def get_signer_parameters(self):
590 try:
591 apdu_rcv = self._send_command(self.CMD.GET_PARAMETERS)
592 return HSM2FirmwareParameters.from_dongle_format(apdu_rcv[self.OFF.DATA:])
593 except ValueError as e:
594 msg = "While getting signer firmware parameters: %s" % str(e)
595 self.logger.error(msg)
596 raise HSM2DongleError(msg)
598 # exit the ledger nano S menu
599 def exit_menu(self, autoexec=True):
600 self._send_command(
601 self.CMD.EXIT_MENU if autoexec else self.CMD.EXIT_MENU_NO_AUTOEXEC,
602 bytes([0x00, 0x00]),
603 )
605 # exit the current app
606 # could either be the UI bootloader, UI heartbeat or Signer
607 def exit_app(self):
608 self._send_command(self.CMD.EXIT_MENU)
610 # get the public key for a bip32 path
611 # key_id: BIP32Path
612 def get_public_key(self, key_id):
613 publicKey = self._send_command(self.CMD.GET_PUBLIC_KEY, key_id.to_binary())
614 return publicKey.hex()
616 # Ask the device to sign a specific input of a given unsigned bitcoin transaction
617 # using the given RSK transaction receipt as an authorization for the signature.
618 # key_id: BIP32Path
619 # rsk_tx_receipt: hex string
620 # btc_tx: hex string
621 # receipt_merkle_proof: list
622 # input_index: int
623 # sighash_computation_mode: a SighashComputationMode instance
624 # witness_script: hex string
625 # outpoint_value: int
626 def sign_authorized(
627 self, key_id, rsk_tx_receipt, receipt_merkle_proof, btc_tx, input_index,
628 sighash_computation_mode, witness_script, outpoint_value
629 ):
630 # *** Signing protocol ***
631 # The order in which things are required and then sent is:
632 # 1. BIP32 path & BTC tx input index (single message)
633 # 2. BTC transaction (several messages, as required by ledger)
634 # 3. RSK transaction receipt (several messages, as required by ledger)
635 # 4. RSK Tx receipt merkle proof (several messages, as required by ledger)
636 # (Note: in theory, one could depend only on the order in which
637 # the ledger requires data and send it blindly. In practice,
638 # we know exactly the order in which the device will require
639 # the data, and use that order to validate it as we go.)
640 #
641 # During these exchanges, an exception can be raised at any moment, which
642 # would signal failure signing.
643 # Specific error codes come with HSM2DongleErrorResult
644 # exception instances and are handled accordingly. Anything else
645 # is treated as an unexpected error and is let for the calling layer
646 # to handle.
648 # Step 1. Send path and input index
649 key_id_bytes = key_id.to_binary()
650 input_index_bytes = input_index.to_bytes(4, byteorder="little", signed=False)
651 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + input_index_bytes
652 try:
653 self.logger.debug("Sign: sending path - %s", data.hex())
654 response = self._send_command(self.CMD.SIGN, data)
656 # We expect the device to ask for the BTC tx next.
657 # If this doesn't happen, error out
658 if response[self.OFF.OP] != self.OP.SIGN.BTC_TX:
659 self.logger.error("Sign: unexpected response %s", response.hex())
660 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
662 # How many bytes to send in the next message
663 bytes_requested = response[self.OFF.DATA]
664 except HSM2DongleErrorResult as e:
665 self.logger.error("Sign returned: %s", hex(e.error_code))
666 if e.error_code in [
667 self.ERR.SIGN.DATA_SIZE,
668 self.ERR.SIGN.DATA_SIZE_AUTH,
669 self.ERR.SIGN.DATA_SIZE_NOAUTH,
670 ]:
671 return (False, self.RESPONSE.SIGN.ERROR_PATH)
672 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
674 # Step 2. Send BTC transaction and extra data
675 # Prefix the BTC transaction with the total length of the payload encoded as a
676 # 4 bytes little endian unsigned integer. The total length should include
677 # those 4 bytes plus 2 bytes for the length of the extradata and 1 byte for
678 # the sighash computation mode
679 try:
680 PAYLOADLENGTH_LENGTH = 4
681 SIGHASH_COMPUTATION_MODE_LENGTH = 1
682 EXTRADATALENGTH_LENGTH = 2
683 OUTPOINT_VALUE_LENGTH = 8
685 btc_tx_bytes = bytes.fromhex(btc_tx)
687 scm_bytes = sighash_computation_mode.netvalue.to_bytes(
688 SIGHASH_COMPUTATION_MODE_LENGTH,
689 byteorder='little', signed=False
690 )
692 ed_bytes = b""
694 if sighash_computation_mode == SighashComputationMode.SEGWIT:
695 ov_bytes = outpoint_value.to_bytes(
696 OUTPOINT_VALUE_LENGTH,
697 byteorder='little', signed=False
698 )
700 ws_bytes = bytes.fromhex(witness_script)
701 ws_length_bytes = bytes.fromhex(encode_varint(len(ws_bytes)))
703 ed_bytes = ws_length_bytes + ws_bytes + ov_bytes
705 edl_bytes = len(ed_bytes).to_bytes(
706 EXTRADATALENGTH_LENGTH,
707 byteorder='little', signed=False
708 )
710 payload_length = \
711 PAYLOADLENGTH_LENGTH + \
712 SIGHASH_COMPUTATION_MODE_LENGTH + \
713 EXTRADATALENGTH_LENGTH + \
714 len(btc_tx_bytes)
716 payload_length_bytes = payload_length.to_bytes(
717 PAYLOADLENGTH_LENGTH, byteorder="little", signed=False
718 )
720 data = payload_length_bytes + scm_bytes + edl_bytes + btc_tx_bytes + ed_bytes
722 response = self._send_data_in_chunks(
723 command=self.CMD.SIGN,
724 operation=self.OP.SIGN.BTC_TX,
725 next_operations=[self.OP.SIGN.TX_RECEIPT],
726 data=data,
727 expect_full_data=True,
728 initial_bytes=bytes_requested,
729 operation_name="sign",
730 data_description="BTC tx",
731 )
733 if not response[0]:
734 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
736 bytes_requested = response[1][self.OFF.DATA]
737 except HSM2DongleErrorResult as e:
738 self.logger.error("Sign returned: %s", hex(e.error_code))
739 if e.error_code in [
740 self.ERR.SIGN.INPUT,
741 self.ERR.SIGN.DATA_SIZE,
742 self.ERR.SIGN.TX_HASH_MISMATCH,
743 self.ERR.SIGN.TX_VERSION,
744 self.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE,
745 self.ERR.SIGN.INVALID_EXTRADATA_SIZE,
746 ]:
747 return (False, self.RESPONSE.SIGN.ERROR_BTC_TX)
748 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
750 # Step 3. Send transaction receipt
751 try:
752 response = self._send_data_in_chunks(
753 command=self.CMD.SIGN,
754 operation=self.OP.SIGN.TX_RECEIPT,
755 next_operations=[self.OP.SIGN.MERKLE_PROOF],
756 data=bytes.fromhex(rsk_tx_receipt),
757 expect_full_data=True,
758 initial_bytes=bytes_requested,
759 operation_name="sign",
760 data_description="tx receipt",
761 )
763 if not response[0]:
764 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
766 bytes_requested = response[1][self.OFF.DATA]
767 except HSM2DongleErrorResult as e:
768 self.logger.error("Sign returned: %s", hex(e.error_code))
769 if e.error_code in [
770 self.ERR.SIGN.STATE,
771 self.ERR.SIGN.RLP,
772 self.ERR.SIGN.RLP_INT,
773 self.ERR.SIGN.RLP_DEPTH,
774 self.ERR.SIGN.DATA_SIZE,
775 ]:
776 return (False, self.RESPONSE.SIGN.ERROR_TX_RECEIPT)
777 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
779 # Step 4. Send tx receipt merkle proof
780 # The format for the receipts merkle proof is as follows:
781 # 1 byte for the number of nodes
782 # For each node: 1 byte for the node length + the node bytes.
783 try:
784 if len(receipt_merkle_proof) > 255:
785 raise ValueError("Too many nodes")
787 merkle_proof_bytes = bytes([len(receipt_merkle_proof)])
788 for node in receipt_merkle_proof:
789 node_bytes = bytes.fromhex(node)
790 if len(node_bytes) > 255:
791 raise ValueError("Node too big: %s" % node)
792 merkle_proof_bytes = (
793 merkle_proof_bytes + bytes([len(node_bytes)]) + node_bytes
794 )
795 except ValueError as e:
796 self.logger.error("Sign: invalid receipts merkle proof: %s", str(e))
797 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF)
799 try:
800 response = self._send_data_in_chunks(
801 command=self.CMD.SIGN,
802 operation=self.OP.SIGN.MERKLE_PROOF,
803 next_operations=[self.OP.SIGN.SUCCESS],
804 data=merkle_proof_bytes,
805 expect_full_data=True,
806 initial_bytes=bytes_requested,
807 operation_name="sign",
808 data_description="receipts merkle proof",
809 )
811 if not response[0]:
812 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
813 except HSM2DongleErrorResult as e:
814 self.logger.error("Sign returned: %s", hex(e.error_code))
815 if e.error_code in [
816 self.ERR.SIGN.DATA_SIZE,
817 self.ERR.SIGN.STATE,
818 self.ERR.SIGN.NODE_VERSION,
819 self.ERR.SIGN.SHARED_PREFIX_TOO_BIG,
820 self.ERR.SIGN.RECEIPT_HASH_MISMATCH,
821 self.ERR.SIGN.NODE_CHAINING_MISMATCH,
822 self.ERR.SIGN.RECEIPT_ROOT_MISMATCH,
823 ]:
824 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF)
825 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
827 # If we get here, we should have a signature in the data part.
828 # Return success along with it.
829 try:
830 return (True, HSM2DongleSignature(response[1][self.OFF.DATA:]))
831 except Exception as e:
832 self.logger.error("Error parsing signature: %s", str(e))
833 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
835 # Ask the device to sign a specific hash without any authorization.
836 # key_id: BIP32Path
837 # hash: hex string
838 def sign_unauthorized(self, key_id, hash):
839 # *** Signing protocol ***
840 # This signing method requires a single message that contains the
841 # BIP32 path & hash to sign.
842 #
843 # An exception can be raised, which
844 # would signal failure signing. Specific error codes
845 # come with HSM2DongleErrorResult
846 # exception instances and are handled accordingly. Anything else
847 # is treated as an unexpected error and is let for the calling layer
848 # to handle.
850 try:
851 hash_bytes = bytes.fromhex(hash)
852 except ValueError:
853 self.logger.error("Sign: invalid hash - %s", hash)
854 return (False, self.RESPONSE.SIGN.ERROR_HASH)
856 # Send path and hash to sign
857 try:
858 key_id_bytes = key_id.to_binary()
859 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + hash_bytes
860 self.logger.debug("Sign: sending path and hash - %s", data.hex())
861 response = self._send_command(self.CMD.SIGN, data)
863 # Special case: if the device asks for a BTC transaction, then
864 # there's a case of both invalid path and invalid hash. Report invalid hash
865 if response[self.OFF.OP] == self.OP.SIGN.BTC_TX:
866 return (False, self.RESPONSE.SIGN.ERROR_HASH)
868 # We expect the device to report success signing
869 # If this doesn't happen, error out
870 if response[self.OFF.OP] != self.OP.SIGN.SUCCESS:
871 self.logger.error("Sign: unexpected response %s", response.hex())
872 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
873 except HSM2DongleErrorResult as e:
874 self.logger.error("Sign returned: %s", hex(e.error_code))
875 if e.error_code in [self.ERR.SIGN.DATA_SIZE, self.ERR.SIGN.DATA_SIZE_NOAUTH]:
876 return (False, self.RESPONSE.SIGN.ERROR_HASH)
877 elif e.error_code in [
878 self.ERR.SIGN.INVALID_PATH,
879 self.ERR.SIGN.DATA_SIZE_AUTH,
880 ]:
881 return (False, self.RESPONSE.SIGN.ERROR_PATH)
882 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
884 # If we get here, we should have a signature in the data part.
885 # Return success along with it.
886 try:
887 return (True, HSM2DongleSignature(response[self.OFF.DATA:]))
888 except Exception as e:
889 self.logger.error("Error parsing signature: %s", str(e))
890 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED)
892 def get_blockchain_state(self):
893 state = {}
895 # Get hashes
896 for (key, hash_cmd) in self.GST.HASH_VALUES.items():
897 self.logger.info("Getting hash value for '%s'", key)
898 result = self._send_command(
899 self.CMD.GET_STATE, bytes([self.OP.GST.HASH, hash_cmd])
900 )
902 # Validate result
903 if (
904 result[self.OFF.OP] != self.OP.GST.HASH
905 or result[self.OFF.DATA] != hash_cmd
906 or len(result[self.OFF.DATA + 1:]) != self.HASH_SIZE
907 ):
908 msg = "Invalid response for hash: %s" % result.hex()
909 self.logger.error(msg)
910 raise HSM2DongleError(msg)
912 state[key] = result[self.OFF.DATA + 1:].hex()
914 # Get difficulty
915 self.logger.info("Getting difficulty")
916 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.DIFF]))
917 if result[self.OFF.OP] != self.OP.GST.DIFF:
918 msg = "Invalid response for difficulty: %s" % result.hex()
919 self.logger.error(msg)
920 raise HSM2DongleError(msg)
922 state["updating.total_difficulty"] = int.from_bytes(
923 result[self.OFF.DATA:], byteorder="big", signed=False
924 )
926 # Get flags
927 self.logger.info("Getting flags")
928 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.FLAGS]))
929 if result[self.OFF.OP] != self.OP.GST.FLAGS or len(result[self.OFF.DATA:]) != 3:
930 msg = "Invalid response for flags: %s" % result.hex()
931 self.logger.error(msg)
932 raise HSM2DongleError(msg)
934 state["updating.in_progress"] = bool(
935 result[self.OFF.DATA + self.GST.FLAG_OFFSET.IN_PROGRESS]
936 )
937 state["updating.already_validated"] = bool(
938 result[self.OFF.DATA + self.GST.FLAG_OFFSET.ALREADY_VALIDATED]
939 )
940 state["updating.found_best_block"] = bool(
941 result[self.OFF.DATA + self.GST.FLAG_OFFSET.FOUND_BEST_BLOCK]
942 )
944 return state
946 def reset_advance_blockchain(self):
947 self.logger.info("Resetting advance blockchain")
948 result = self._send_command(self.CMD.RESET_AB, bytes([self.OP.RAV.INIT]))
949 if result[self.OFF.OP] != self.OP.RAV.DONE:
950 msg = "Invalid response for reset advance blockchain: %s" % result.hex()
951 self.logger.error(msg)
952 raise HSM2DongleError(msg)
954 return True
956 # Ask the device to update its blockchain references by processing
957 # a given set of blocks and their brothers.
958 # blocks: list of hex strings
959 # (each hex string is a raw block header,
960 # which should *always* include merge mining fields)
961 # brothers: list of list of hex strings
962 # (each list of hex strings is the block's brothers' headers
963 # for the corresponding block header in the same position
964 # of the blocks list)
965 def advance_blockchain(self, blocks, brothers):
966 # Convenient shorthands
967 err = self.ERR.ADVANCE
968 response = self.RESPONSE.ADVANCE
970 # Sort each group of brothers by block hash
971 brothers = list(map(lambda brolist:
972 sorted(brolist,
973 key=lambda bh: bytes.fromhex(get_block_hash(bh))
974 ),
975 brothers)
976 )
978 return self._do_block_operation(
979 "advance",
980 blocks,
981 brothers,
982 self.CMD.ADVANCE,
983 self.OP.ADVANCE,
984 err,
985 response,
986 {
987 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK,
988 err.MERKLE_PROOF_OVERFLOW: response.ERROR_INVALID_BLOCK,
989 err.CB_TXN_OVERFLOW: response.ERROR_INVALID_BLOCK,
990 err.RLP_INVALID: response.ERROR_INVALID_BLOCK,
991 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK,
992 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK,
993 err.UMM_ROOT_INVALID: response.ERROR_INVALID_BLOCK,
994 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK,
995 err.MERKLE_PROOF_INVALID: response.ERROR_INVALID_BLOCK,
996 err.BLOCK_DIFF_INVALID: response.ERROR_INVALID_BLOCK,
997 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK,
998 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK,
999 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK,
1000 err.MERKLE_PROOF_MISMATCH: response.ERROR_POW_INVALID,
1001 err.BTC_CB_TXN_INVALID: response.ERROR_POW_INVALID,
1002 err.MM_HASH_MISMATCH: response.ERROR_POW_INVALID,
1003 err.BTC_DIFF_MISMATCH: response.ERROR_POW_INVALID,
1004 err.CB_TXN_HASH_MISMATCH: response.ERROR_POW_INVALID,
1005 err.BROTHERS_TOO_MANY: response.ERROR_INVALID_BROTHERS,
1006 err.BROTHER_PARENT_MISMATCH: response.ERROR_INVALID_BROTHERS,
1007 err.BROTHER_SAME_AS_BLOCK: response.ERROR_INVALID_BROTHERS,
1008 err.BROTHER_ORDER_INVALID: response.ERROR_INVALID_BROTHERS,
1009 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH,
1010 err.TOTAL_DIFF_OVERFLOW: response.ERROR_UNSUPPORTED_CHAIN,
1011 err.PROT_INVALID: response.ERROR_BLOCK_DATA,
1012 },
1013 )
1015 # Ask the device to update its ancestor block and ancestor receipts root
1016 # references by processing a given set of blocks.
1017 # blocks: list of hex strings
1018 # (each hex string is a raw block header,
1019 # which doesn't need to include merge mining fields -
1020 # those will be stripped for efficiency before being sent
1021 # to the device anyway)
1022 def update_ancestor(self, blocks):
1023 # Convenient shorthands
1024 err = self.ERR.UPD_ANCESTOR
1025 response = self.RESPONSE.UPD_ANCESTOR
1027 # Optimization: remove merge mining fields (if present) from blocks
1028 try:
1029 self.logger.info("Removing merge mining fields from %d blocks", len(blocks))
1030 optimized_blocks = list(map(remove_mm_fields_if_present, blocks))
1031 except ValueError as e:
1032 self.logger.error("While removing merge mining fields: %s", str(e))
1033 return (False, response.ERROR_REMOVE_MM_FIELDS)
1035 return self._do_block_operation(
1036 "updancestor",
1037 optimized_blocks,
1038 None,
1039 self.CMD.UPD_ANCESTOR,
1040 self.OP.UPD_ANCESTOR,
1041 err,
1042 response,
1043 {
1044 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK,
1045 err.RLP_INVALID: response.ERROR_INVALID_BLOCK,
1046 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK,
1047 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK,
1048 err.RECEIPT_ROOT_INVALID: response.ERROR_INVALID_BLOCK,
1049 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK,
1050 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK,
1051 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK,
1052 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK,
1053 err.ANCESTOR_TIP_MISMATCH: response.ERROR_TIP_MISMATCH,
1054 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH,
1055 err.PROT_INVALID: response.ERROR_BLOCK_DATA,
1056 },
1057 )
1059 def get_ui_attestation(self, ud_value_hex):
1060 # Parse hexadecimal values
1061 ud_value = bytes.fromhex(ud_value_hex)
1063 # Get UI hash
1064 ui_hash = self._send_command(
1065 self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_APP_HASH])
1066 )[self.OFF.DATA:]
1068 # Send UD value
1069 data = bytes([self.OP.UI_ATT.OP_UD_VALUE]) + ud_value
1070 self._send_command(self.CMD.UI_ATT, data)
1072 # Retrieve message
1073 page = 0
1074 message = b""
1075 while True:
1076 if page == self.MAX_PAGES_UI_ATT_MESSAGE:
1077 msg = (
1078 "Maximum number of UI attestation pages exceeded ()"
1079 % self.MAX_PAGES_UI_ATT_MESSAGE
1080 )
1081 self.logger.error(msg)
1082 raise HSM2DongleError(msg)
1083 data = bytes([self.OP.UI_ATT.OP_GET_MSG, page])
1084 response = self._send_command(self.CMD.UI_ATT, data)
1085 page += 1
1086 message += response[self.OFF.DATA + 1:]
1087 if response[self.OFF.DATA] == 0:
1088 break
1090 # Retrieve attestation
1091 attestation = self._send_command(self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_GET]))[self.OFF.DATA:] # noqa E501
1093 return {
1094 "app_hash": ui_hash.hex(),
1095 "message": message.hex(),
1096 "signature": attestation.hex(),
1097 }
1099 def get_powhsm_attestation(self, ud_value_hex):
1100 return PowHsmAttestation(self).run(ud_value_hex)
1102 def get_signer_heartbeat(self, ud_value):
1103 return HSM2SignerHeartbeat(self).run(ud_value)
1105 def get_ui_heartbeat(self, ud_value):
1106 return HSM2UIHeartbeat(self).run(ud_value)
1108 def authorize_signer(self, signer_authorization):
1109 # Send signer version
1110 self._send_command(self.CMD.SIGNER_AUTH,
1111 bytes([self.OP.SIGNER_AUTH.OP_SIGVER]) +
1112 bytes.fromhex(signer_authorization.signer_version.hash) +
1113 signer_authorization.signer_version.iteration.to_bytes(
1114 self.SIGNER_AUTH_ITERATION_SIZE,
1115 byteorder='big', signed=False))
1117 # Send signatures one by one
1118 result = None
1119 for signature in signer_authorization.signatures:
1120 result = self._send_command(self.CMD.SIGNER_AUTH,
1121 bytes([self.OP.SIGNER_AUTH.OP_SIGN]) +
1122 bytes.fromhex(signature))[self.OFF.DATA]
1123 # Are we done?
1124 if result == self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS:
1125 return True
1127 # Are we not done after all signatures were sent?
1128 if result != self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS:
1129 raise HSM2DongleError("Not enough signatures given. "
1130 "Signer authorization failed")
1132 return True
1134 # Used both for advance blockchain and update ancestor given the protocol
1135 # is very similar
1136 def _do_block_operation(
1137 self,
1138 operation_name,
1139 blocks,
1140 brothers,
1141 command,
1142 ops,
1143 errors,
1144 responses,
1145 chunk_error_mapping,
1146 ):
1147 # *** Block operation protocol ***
1148 # The order in which things are required and then sent is:
1149 # 1. Initialization, where the total number of blocks to send is sent.
1150 # 2. For each block header:
1151 # 2.1. Block metadata (single message):
1152 # - MM payload size in bytes
1153 # (see the block_utils.rlp_mm_payload_size method for details on this)
1154 # - In case of an advance blockchain operation,
1155 # coinbase transaction hash (see the block_utils.coinbase_tx_get_hash
1156 # for details on this)
1157 # 2.2. Block chunks: block header pieces as requested by the ledger.
1158 # 2.3. Brothers -- only for advance blockchain:
1159 # 2.3.1 Brothers metadata (single message):
1160 # - Brother count
1161 # 2.3.2 For each brother (if brother count was greater than zero):
1162 # 2.3.2.1. Brother metadata (single message):
1163 # - MM payload size in bytes
1164 # (see the block_utils.rlp_mm_payload_size method for details on this)
1165 # - Coinbase transaction hash (see the block_utils.coinbase_tx_get_hash
1166 # for details on this)
1167 # 2.3.2.2. Brother chunks: brother header pieces as requested by the ledger.
1168 #
1169 # During these exchanges, an exception can be raised at any moment, which
1170 # would signal failure. Specific error codes come with HSM2DongleErrorResult
1171 # exception instances and are handled accordingly. Anything else
1172 # is treated as an unexpected error and is let for the calling layer
1173 # to handle.
1175 # Step 1. Send initialization
1176 num_blocks_bytes = len(blocks).to_bytes(4, byteorder="big", signed=False)
1177 data = bytes([ops.INIT]) + num_blocks_bytes
1178 try:
1179 self.logger.info(
1180 "%s: sending initialization - %s", operation_name.capitalize(), data.hex()
1181 )
1182 response = self._send_command(command, data)
1184 # We expect the device to ask for block metadata next.
1185 # If this doesn't happen, error out
1186 if response[self.OFF.OP] != ops.HEADER_META:
1187 self.logger.error(
1188 "%s: unexpected response %s",
1189 operation_name.capitalize(),
1190 response.hex(),
1191 )
1192 return (False, responses.ERROR_UNEXPECTED)
1193 except HSM2DongleErrorResult as e:
1194 self.logger.error(
1195 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1196 )
1197 if e.error_code in [errors.PROT_INVALID]:
1198 return (False, responses.ERROR_INIT)
1199 return (False, responses.ERROR_UNEXPECTED)
1201 # Step 2. Send blocks (and brothers, if any)
1202 total_blocks = len(blocks)
1203 for block_number, block in enumerate(blocks, 1):
1204 self.logger.info(
1205 "%s: sending block #%d/%d",
1206 operation_name.capitalize(),
1207 block_number,
1208 total_blocks,
1209 )
1211 response = self._send_block_header(
1212 operation_name=operation_name,
1213 header_name="block",
1214 block=block,
1215 command=command,
1216 ops=ops,
1217 op_meta=ops.HEADER_META,
1218 op_chunk=ops.HEADER_CHUNK,
1219 responses=responses,
1220 errors=errors,
1221 chunk_error_mapping=chunk_error_mapping
1222 )
1223 if not response[0]:
1224 return response
1226 # Step 2.3. Send brothers
1227 # *** Only for advance blockchain and if requested by the dongle ***
1228 if command == self.CMD.ADVANCE and \
1229 response[1][self.OFF.OP] == ops.BROTHER_LIST_META:
1231 # Step 2.3.1. Send brother list metadata
1232 brother_list = brothers[block_number-1]
1233 brother_count = len(brother_list)
1234 brother_count_bytes = brother_count.to_bytes(1,
1235 byteorder="big",
1236 signed=False)
1237 data = bytes([ops.BROTHER_LIST_META]) + brother_count_bytes
1238 try:
1239 self.logger.info(
1240 "%s: sending brother list metadata - %s",
1241 operation_name.capitalize(), data.hex()
1242 )
1243 response = [None, self._send_command(command, data)]
1245 # If we have at least one brother,
1246 # we expect the device to ask for brother metadata next.
1247 # If this doesn't happen, error out
1248 if brother_count > 0 and response[1][self.OFF.OP] != ops.BROTHER_META:
1249 self.logger.error(
1250 "%s: unexpected response %s",
1251 operation_name.capitalize(),
1252 response[1].hex(),
1253 )
1254 return (False, responses.ERROR_UNEXPECTED)
1255 except HSM2DongleErrorResult as e:
1256 self.logger.error(
1257 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1258 )
1259 if e.error_code in [errors.PROT_INVALID, errors.BROTHERS_TOO_MANY]:
1260 return (False, responses.ERROR_INVALID_BROTHERS)
1261 return (False, responses.ERROR_UNEXPECTED)
1263 # Step 2.3.2. Send each brother
1264 for brother_number, brother in enumerate(brother_list, 1):
1265 self.logger.info(
1266 "%s: sending brother #%d/%d",
1267 operation_name.capitalize(),
1268 brother_number,
1269 brother_count,
1270 )
1272 response = self._send_block_header(
1273 operation_name=operation_name,
1274 header_name="brother",
1275 block=brother,
1276 command=command,
1277 ops=ops,
1278 op_meta=ops.BROTHER_META,
1279 op_chunk=ops.BROTHER_CHUNK,
1280 responses=responses,
1281 errors=errors,
1282 chunk_error_mapping=chunk_error_mapping
1283 )
1284 if not response[0]:
1285 return response
1287 # Partial success?
1288 if command == self.CMD.ADVANCE and response[1][self.OFF.OP] == ops.PARTIAL:
1289 self.logger.info("%s: partial success", operation_name.capitalize())
1290 return (True, responses.OK_PARTIAL)
1292 # Success?
1293 if response[1][self.OFF.OP] == ops.SUCCESS:
1294 self.logger.info("%s: total success", operation_name.capitalize())
1295 return (True, responses.OK_TOTAL)
1297 # We shouldn't be able to ever reach this point
1298 msg = "%s: unexpected state" % operation_name.capitalize()
1299 self.logger.fatal(msg)
1300 raise HSM2DongleError(msg)
1302 # Send an individual block header to the device, including computing
1303 # and sending metadata
1304 # This is used both for advance blockchain (block and brother headers)
1305 # and update ancestor given the protocol is very similar
1306 def _send_block_header(
1307 self,
1308 operation_name,
1309 header_name,
1310 block,
1311 command,
1312 ops,
1313 op_meta,
1314 op_chunk,
1315 responses,
1316 errors,
1317 chunk_error_mapping
1318 ):
1319 # A. Compute and send block metadata
1320 # (this will also validate that the block is a valid RLP-encoded list
1321 # of the proper size)
1322 try:
1323 # RLP payload size for merge mining hash
1324 mm_payload_size = rlp_mm_payload_size(block)
1325 self.logger.debug(
1326 "%s metadata: MM payload length %d",
1327 header_name.capitalize(),
1328 mm_payload_size)
1329 mm_payload_size_bytes = mm_payload_size.to_bytes(
1330 2, byteorder="big", signed=False
1331 )
1332 # Coinbase transaction hash
1333 cb_txn_hash = bytes([])
1334 if command == self.CMD.ADVANCE:
1335 cb_txn_hash = bytes.fromhex(
1336 coinbase_tx_get_hash(get_coinbase_txn(block))
1337 )
1338 self.logger.debug(
1339 "%s Metadata: CB txn hash: %s",
1340 header_name.capitalize(),
1341 cb_txn_hash.hex())
1342 # Wrap and send
1343 data = bytes([op_meta]) + mm_payload_size_bytes + cb_txn_hash
1344 self.logger.info(
1345 "%s: sending %s metadata - %s",
1346 operation_name.capitalize(),
1347 header_name,
1348 data.hex()
1349 )
1350 response = self._send_command(command, data)
1352 # We expect the device to ask for a block chunk next.
1353 # If this doesn't happen, error out
1354 if response[self.OFF.OP] != op_chunk:
1355 self.logger.error(
1356 "%s: unexpected response %s",
1357 operation_name.capitalize(),
1358 response.hex(),
1359 )
1360 return (False, responses.ERROR_UNEXPECTED)
1362 # How many bytes to send as the first block chunk
1363 bytes_requested = response[self.OFF.DATA]
1364 except ValueError as e:
1365 self.logger.error("Computing %s metadata: %s", header_name, str(e))
1366 return (False, responses.ERROR_COMPUTE_METADATA)
1367 except HSM2DongleErrorResult as e:
1368 self.logger.error(
1369 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1370 )
1371 if e.error_code in [errors.PROT_INVALID]:
1372 return (False, responses.ERROR_METADATA)
1373 return (False, responses.ERROR_UNEXPECTED)
1375 # B. Send block data in chunks
1376 try:
1377 # Next possible operations depending on the specific command
1378 # and type of header we're sending
1379 next_operations = [op_chunk, op_meta, ops.SUCCESS]
1380 if command == self.CMD.ADVANCE:
1381 next_operations.append(ops.PARTIAL)
1382 if header_name == "block":
1383 next_operations.append(ops.BROTHER_LIST_META)
1384 if header_name == "brother":
1385 next_operations.append(ops.HEADER_META)
1387 response = self._send_data_in_chunks(
1388 command=command,
1389 operation=op_chunk,
1390 next_operations=next_operations,
1391 data=bytes.fromhex(block),
1392 expect_full_data=False,
1393 initial_bytes=bytes_requested,
1394 operation_name=operation_name,
1395 data_description=header_name,
1396 )
1398 if not response[0]:
1399 return (False, responses.ERROR_UNEXPECTED)
1400 except HSM2DongleErrorResult as e:
1401 self.logger.error(
1402 "%s returned: %s", operation_name.capitalize(), hex(e.error_code)
1403 )
1404 return (
1405 False,
1406 chunk_error_mapping.get(e.error_code, responses.ERROR_UNEXPECTED),
1407 )
1409 return response
1411 # Send a specific piece of data in chunks to the device
1412 # as the device requests bytes from it.
1413 # Validate responses wrt current operation and next possible expected operations
1414 # Exceptions are to be handled by the caller
1415 def _send_data_in_chunks(
1416 self,
1417 command,
1418 operation,
1419 next_operations,
1420 data,
1421 expect_full_data,
1422 initial_bytes,
1423 operation_name,
1424 data_description,
1425 ):
1426 offset = 0
1427 bytes_requested = initial_bytes
1428 total_bytes_sent = 0
1429 finished = False
1430 while not finished:
1431 to_send = data[offset:offset + bytes_requested]
1432 to_send_length = len(to_send)
1433 self.logger.debug(
1434 "%s: sending %s chunk [%d:%d] - %s",
1435 operation_name.capitalize(),
1436 data_description,
1437 offset,
1438 offset + to_send_length,
1439 to_send.hex(),
1440 )
1441 response = self._send_command(command, bytes([operation]) + to_send)
1443 # Increase count and buffer pointer
1444 total_bytes_sent += to_send_length
1445 offset += to_send_length
1447 # We expect the device to either ask for the current or for the
1448 # next operation.
1449 # If none of this happens, error out
1450 if response[self.OFF.OP] not in ([operation] + next_operations):
1451 self.logger.debug(
1452 "Current operation %s, next operations %s, ledger requesting %s",
1453 hex(operation),
1454 str(list(map(hex, next_operations))),
1455 hex(response[2]),
1456 )
1457 self.logger.error(
1458 "%s: unexpected response %s",
1459 operation_name.capitalize(),
1460 response.hex(),
1461 )
1462 return (False, response)
1464 # We finish when the device requests the next piece of data
1465 finished = response[self.OFF.OP] != operation
1467 # Have we finished but not sent all data when required to do so?
1468 if expect_full_data and finished and total_bytes_sent < len(data):
1469 self.logger.error(
1470 "%s: expected to send all %d data bytes but sent %d and got %s",
1471 operation_name.capitalize(),
1472 len(data),
1473 total_bytes_sent,
1474 response.hex()
1475 )
1476 return (False, response)
1478 # How many bytes to send in the next message
1479 if not finished:
1480 bytes_requested = response[self.OFF.DATA]
1481 self.logger.debug("Dongle requested %d bytes", bytes_requested)
1483 # All is good
1484 return (True, response)