Coverage for ledger/hsm2dongle.py: 90%

684 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import struct 

24from enum import IntEnum, Enum, auto 

25from ledgerblue.comm import getDongle 

26from ledgerblue.commException import CommException 

27import hid 

28from .signature import HSM2DongleSignature 

29from .version import HSM2FirmwareVersion 

30from .parameters import HSM2FirmwareParameters 

31from .hsm2dongle_cmds import HSM2SignerHeartbeat, HSM2UIHeartbeat, PowHsmAttestation 

32from .block_utils import ( 

33 rlp_mm_payload_size, 

34 remove_mm_fields_if_present, 

35 get_coinbase_txn, 

36 get_block_hash, 

37) 

38from comm.bitcoin import encode_varint 

39from comm.pow import coinbase_tx_get_hash 

40import logging 

41 

42# Enumerations 

43 

44# Dongle commands 

45 

46 

47class _Command(IntEnum): 

48 IS_ONBOARD = 0x06 

49 ECHO = 0x02 # UI command 

50 SIGN = 0x02 # Signer command 

51 GET_PUBLIC_KEY = 0x04 

52 SEND_PIN = 0x41 

53 UNLOCK = 0xFE 

54 CHANGE_PIN = 0x08 

55 GET_MODE = 0x43 

56 EXIT_MENU = 0xFF 

57 EXIT_MENU_NO_AUTOEXEC = 0xFA 

58 GET_STATE = 0x20 

59 RESET_AB = 0x21 

60 ADVANCE = 0x10 

61 UPD_ANCESTOR = 0x30 

62 GET_PARAMETERS = 0x11 

63 SEED = 0x44 

64 WIPE = 0x07 

65 UI_ATT = 0x50 

66 SIGNER_AUTH = 0x51 

67 RETRIES = 0x45 

68 

69 

70# Sign command OPs 

71class _SignOps(IntEnum): 

72 PATH = 0x01 

73 BTC_TX = 0x02 

74 TX_RECEIPT = 0x04 

75 MERKLE_PROOF = 0x08 

76 SUCCESS = 0x81 

77 

78 

79# Get blockchain state command OPs 

80class _GetStateOps(IntEnum): 

81 HASH = 0x01 

82 DIFF = 0x02 

83 FLAGS = 0x03 

84 

85 

86# Reset advance blockchain command OPs 

87class _ResetAdvanceOps(IntEnum): 

88 INIT = 0x01 

89 DONE = 0x02 

90 

91 

92# Advance blockchain command OPs 

93class _AdvanceOps(IntEnum): 

94 INIT = 0x02 

95 HEADER_META = 0x03 

96 HEADER_CHUNK = 0x04 

97 PARTIAL = 0x05 

98 SUCCESS = 0x06 

99 BROTHER_LIST_META = 0x07 

100 BROTHER_META = 0x08 

101 BROTHER_CHUNK = 0x09 

102 

103 

104# Update ancestor command OPs 

105class _UpdateAncestorOps(IntEnum): 

106 INIT = 0x02 

107 HEADER_META = 0x03 

108 HEADER_CHUNK = 0x04 

109 SUCCESS = 0x05 

110 

111 

112# UI attestation OPs 

113class _UIAttestationOps(IntEnum): 

114 OP_UD_VALUE = 0x01 

115 OP_GET_MSG = 0x02 

116 OP_GET = 0x03 

117 OP_APP_HASH = 0x04 

118 

119 

120# Signer authorization OPs (and results) 

121class _SignerAuthorizationOps(IntEnum): 

122 OP_SIGVER = 0x01 

123 OP_SIGN = 0x02 

124 OP_SIGN_RES_MORE = 0x01 

125 OP_SIGN_RES_SUCCESS = 0x02 

126 

127 

128# Command Ops 

129class _Ops: 

130 SIGN = _SignOps 

131 GST = _GetStateOps 

132 RAV = _ResetAdvanceOps 

133 ADVANCE = _AdvanceOps 

134 UPD_ANCESTOR = _UpdateAncestorOps 

135 UI_ATT = _UIAttestationOps 

136 SIGNER_AUTH = _SignerAuthorizationOps 

137 

138 

139# Protocol offsets 

140class _Offset(IntEnum): 

141 CLA = 0 

142 CMD = 1 

143 OP = 2 

144 DATA = 3 

145 

146 

147# Device modes 

148class _Mode(IntEnum): 

149 BOOTLOADER = 0x02 

150 SIGNER = 0x03 

151 UI_HEARTBEAT = 0x04 

152 UNKNOWN = 0xFF 

153 

154 

155# Get blockchain state flag indexes 

156class _GetStateFlagOffset(IntEnum): 

157 IN_PROGRESS = 0 

158 ALREADY_VALIDATED = 1 

159 FOUND_BEST_BLOCK = 2 

160 

161 

162# Get blockchain state constants 

163class _GetState: 

164 FLAG_OFFSET = _GetStateFlagOffset 

165 HASH_VALUES = { 

166 "best_block": 0x01, 

167 "newest_valid_block": 0x02, 

168 "ancestor_block": 0x03, 

169 "ancestor_receipts_root": 0x05, 

170 "updating.best_block": 0x81, 

171 "updating.newest_valid_block": 0x82, 

172 "updating.next_expected_block": 0x84, 

173 } 

174 

175 

176# Sign command errors 

177class _SignError(IntEnum): 

178 DATA_SIZE = 0x6A87 

179 INPUT = auto() 

180 STATE = auto() 

181 RLP = auto() 

182 RLP_INT = auto() 

183 RLP_DEPTH = auto() 

184 TX_HASH_MISMATCH = auto() 

185 TX_VERSION = auto() 

186 INVALID_PATH = auto() 

187 DATA_SIZE_AUTH = auto() 

188 DATA_SIZE_NOAUTH = auto() 

189 NODE_VERSION = auto() 

190 SHARED_PREFIX_TOO_BIG = auto() 

191 RECEIPT_HASH_MISMATCH = auto() 

192 NODE_CHAINING_MISMATCH = auto() 

193 RECEIPT_ROOT_MISMATCH = auto() 

194 INVALID_SIGHASH_COMPUTATION_MODE = auto() 

195 INVALID_EXTRADATA_SIZE = auto() 

196 

197 

198# Get public key command errors 

199class _GetPubKeyError(IntEnum): 

200 DATA_SIZE = 0x6A87 

201 

202 

203# Advance blockchain and update ancestor command errors 

204class _AdvanceUpdateError(IntEnum): 

205 UNKNOWN = 0 

206 PROT_INVALID = 0x6B87 

207 RLP_INVALID = auto() 

208 BLOCK_TOO_OLD = auto() 

209 BLOCK_TOO_SHORT = auto() 

210 PARENT_HASH_INVALID = auto() 

211 RECEIPT_ROOT_INVALID = auto() 

212 BLOCK_NUM_INVALID = auto() 

213 BLOCK_DIFF_INVALID = auto() 

214 UMM_ROOT_INVALID = auto() 

215 BTC_HEADER_INVALID = auto() 

216 MERKLE_PROOF_INVALID = auto() 

217 BTC_CB_TXN_INVALID = auto() 

218 MM_RLP_LEN_MISMATCH = auto() 

219 BTC_DIFF_MISMATCH = auto() 

220 MERKLE_PROOF_MISMATCH = auto() 

221 MM_HASH_MISMATCH = auto() 

222 MERKLE_PROOF_OVERFLOW = auto() 

223 CB_TXN_OVERFLOW = auto() 

224 BUFFER_OVERFLOW = auto() 

225 CHAIN_MISMATCH = auto() 

226 TOTAL_DIFF_OVERFLOW = auto() 

227 ANCESTOR_TIP_MISMATCH = auto() 

228 CB_TXN_HASH_MISMATCH = auto() 

229 BROTHERS_TOO_MANY = auto() 

230 BROTHER_PARENT_MISMATCH = auto() 

231 BROTHER_SAME_AS_BLOCK = auto() 

232 BROTHER_ORDER_INVALID = auto() 

233 

234 

235class _UIError(IntEnum): 

236 INVALID_PIN = 0x69A0 

237 

238 

239class _UIAttestationError(IntEnum): 

240 PROT_INVALID = 0x6A01 

241 NO_ONBOARD = 0x6A02 

242 INTERNAL = 0x6A99 

243 

244 

245class _SignerAuthorizationError(IntEnum): 

246 PROT_INVALID = 0x6A01 

247 INVALID_ITERATION = 0x6a03 

248 INVALID_SIGNATURE = 0x6a04 

249 INVALID_AUTH_INVALID_INDEX = 0x6a05 

250 

251 

252# Error codes 

253class _Error: 

254 SIGN = _SignError 

255 GETPUBKEY = _GetPubKeyError 

256 ADVANCE = _AdvanceUpdateError 

257 UPD_ANCESTOR = _AdvanceUpdateError 

258 UI = _UIError 

259 UI_ATT = _UIAttestationError 

260 SIGNER_AUTH = _SignerAuthorizationError 

261 

262 # Whether a given code is in the 

263 # user-defined (RSK firmware) specific error code range 

264 @staticmethod 

265 def is_user_defined_error(code): 

266 return (code >= 0x69A0 and code <= 0x6BFF) or code == 0x6D00 

267 

268 

269# Sign command responses to the user 

270class _SignResponse(IntEnum): 

271 ERROR_PATH = -1 

272 ERROR_BTC_TX = -2 

273 ERROR_TX_RECEIPT = -3 

274 ERROR_MERKLE_PROOF = -4 

275 ERROR_HASH = -5 

276 ERROR_UNEXPECTED = -10 

277 

278 

279# Advance blockchain responses to the user 

280class _AdvanceResponse(IntEnum): 

281 OK_TOTAL = 1 

282 OK_PARTIAL = 2 

283 

284 ERROR_INIT = -1 

285 ERROR_COMPUTE_METADATA = -2 

286 ERROR_METADATA = -3 

287 ERROR_BLOCK_DATA = -4 

288 ERROR_INVALID_BLOCK = -5 

289 ERROR_POW_INVALID = -6 

290 ERROR_CHAINING_MISMATCH = -7 

291 ERROR_UNSUPPORTED_CHAIN = -8 

292 ERROR_INVALID_BROTHERS = -9 

293 ERROR_UNEXPECTED = -10 

294 

295 

296# Update ancestor responses to the user 

297class _UpdateAncestorResponse(IntEnum): 

298 OK_TOTAL = 1 

299 

300 ERROR_INIT = -1 

301 ERROR_COMPUTE_METADATA = -2 

302 ERROR_METADATA = -3 

303 ERROR_BLOCK_DATA = -4 

304 ERROR_INVALID_BLOCK = -5 

305 ERROR_CHAINING_MISMATCH = -6 

306 ERROR_TIP_MISMATCH = -7 

307 ERROR_REMOVE_MM_FIELDS = -8 

308 ERROR_UNEXPECTED = -10 

309 

310 

311# Responses 

312class _Response: 

313 SIGN = _SignResponse 

314 ADVANCE = _AdvanceResponse 

315 UPD_ANCESTOR = _UpdateAncestorResponse 

316 

317 

318# Onboarding constants 

319class _Onboarding(IntEnum): 

320 SEED_LENGTH = 32 

321 TIMEOUT = 10 

322 

323 

324# Sighash computation modes 

325class SighashComputationMode(Enum): 

326 def __new__(cls, *args, **kwds): 

327 obj = object.__new__(cls) 

328 obj._value_ = args[0] 

329 obj.netvalue = args[1] 

330 return obj 

331 

332 LEGACY = "legacy", 0 

333 SEGWIT = "segwit", 1 

334 

335 

336class HSM2DongleBaseError(RuntimeError): 

337 @property 

338 def message(self): 

339 if len(self.args) == 0: 

340 return None 

341 return self.args[0] 

342 

343 

344class HSM2DongleError(HSM2DongleBaseError): 

345 pass 

346 

347 

348class HSM2DongleTimeoutError(HSM2DongleBaseError): 

349 @staticmethod 

350 def is_timeout(exc): 

351 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout": 

352 return True 

353 return False 

354 

355 

356class HSM2DongleCommError(HSM2DongleBaseError): 

357 @staticmethod 

358 def is_comm_error(exc): 

359 if ( 

360 type(exc) == BaseException 

361 and len(exc.args) == 1 

362 and exc.args[0] == "Error while writing" 

363 ) or ( 

364 type(exc) == OSError 

365 and len(exc.args) == 1 

366 and exc.args[0] == "read error" 

367 ) or isinstance(exc, HSM2DongleCommError): 

368 return True 

369 return False 

370 

371 

372class HSM2DongleErrorResult(HSM2DongleBaseError): 

373 @property 

374 def error_code(self): 

375 return self.args[0] 

376 

377 def __str__(self): 

378 return f"Dongle returned error code {hex(self.error_code)}" 

379 

380 

381# Handles low-level communication with a powHSM dongle 

382class HSM2Dongle: 

383 # Ledger constants 

384 HASH_SIZE = 32 

385 

386 # APDU prefix 

387 CLA = 0x80 

388 

389 # Enumeration shorthands 

390 OFF = _Offset 

391 CMD = _Command 

392 MODE = _Mode 

393 OP = _Ops 

394 ERR = _Error 

395 GST = _GetState 

396 RESPONSE = _Response 

397 ONBOARDING = _Onboarding 

398 

399 # Dongle exchange timeout 

400 DONGLE_TIMEOUT = 10 # seconds 

401 

402 # Maximum pages expected to conform the UI attestation message 

403 MAX_PAGES_UI_ATT_MESSAGE = 4 

404 

405 # Size of the iteration parameter for the signer authorization 

406 SIGNER_AUTH_ITERATION_SIZE = 2 

407 

408 # Shorthand for externally defined commands 

409 ErrorResult = HSM2DongleErrorResult 

410 

411 def __init__(self, debug): 

412 self.logger = logging.getLogger("dongle") 

413 self.debug = debug 

414 self.last_comm_exception = None 

415 

416 # Send command to device 

417 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT): 

418 self.last_comm_exception = None 

419 try: 

420 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data) 

421 self.logger.debug("Sending command: 0x%s", cmd.hex()) 

422 result = self.dongle.exchange(cmd, timeout=timeout) 

423 self.logger.debug("Received: 0x%s", result.hex()) 

424 except (CommException, BaseException) as e: 

425 # If this is a user-defined error, raise an 

426 # error result error 

427 if type(e) == CommException: 

428 self.last_comm_exception = e 

429 error_code = e.sw 

430 if _Error.is_user_defined_error(error_code): 

431 self.logger.error("Received error code: %s", hex(error_code)) 

432 raise HSM2DongleErrorResult(error_code) 

433 

434 # If this is a dongle timeout, raise a timeout error 

435 if HSM2DongleTimeoutError.is_timeout(e): 

436 raise HSM2DongleTimeoutError(str(e)) 

437 

438 # If this is a dongle communication problem, raise a comm error 

439 if HSM2DongleCommError.is_comm_error(e): 

440 raise HSM2DongleCommError(str(e)) 

441 

442 # Raise a standard error, but 

443 # report differently for a CommException and any other 

444 # type of exception 

445 if type(e) == CommException: 

446 msg = "Error sending command: %s" % str(e) 

447 self.logger.error(msg) 

448 else: 

449 msg = "Unknown error sending command: %s (of type %s)" % \ 

450 (str(e), type(e).__name__) 

451 self.logger.critical(msg) 

452 

453 raise HSM2DongleError(msg) 

454 

455 return result 

456 

457 # Send command version to be used by command classes 

458 def send_command(self, cmd, op, data, timeout=DONGLE_TIMEOUT): 

459 return self._send_command(cmd, bytes([op]) + data, timeout) 

460 

461 # Connect to the dongle 

462 def connect(self): 

463 try: 

464 self.logger.info("Connecting") 

465 self.dongle = getDongle(self.debug) 

466 self.logger.info("Connected") 

467 except CommException as e: 

468 msg = "Error connecting: %s" % e.message 

469 self.logger.error(msg) 

470 raise HSM2DongleCommError(msg) 

471 

472 # Disconnect from dongle 

473 def disconnect(self): 

474 try: 

475 self.logger.info("Disconnecting") 

476 if self.dongle and self.dongle.opened: 

477 self.dongle.close() 

478 # **** Begin hack **** 

479 # When running within a docker container, 

480 # the hidapi library fails to detect a physical 

481 # usb device reconnection. This will "hard reset" the 

482 # stack so that a potential physical device reconnection 

483 # can be detected. 

484 try: 

485 hid.hidapi_exit() 

486 except Exception: 

487 # hidapi_exit() can sometimes throw. we don't care 

488 pass 

489 # **** End hack **** 

490 self.logger.info("Disconnected") 

491 except CommException as e: 

492 msg = "Error disconnecting: %s" % e.message 

493 self.logger.error(msg) 

494 raise HSM2DongleCommError(msg) 

495 

496 # Return device mode 

497 def get_current_mode(self): 

498 try: 

499 apdu_rcv = self._send_command(self.CMD.GET_MODE) 

500 return self.MODE(apdu_rcv[1]) 

501 except HSM2DongleError: 

502 return self.MODE.UNKNOWN 

503 

504 # Echo message 

505 def echo(self): 

506 message = bytes([0x41, 0x42, 0x43]) 

507 result = bytes(self._send_command(self.CMD.ECHO, message)) 

508 # Result should be the command plus the message 

509 expected_result = bytes([self.CLA, self.CMD.ECHO]) + message 

510 return result == expected_result 

511 

512 # Return true if the hsm2 is onboarded 

513 def is_onboarded(self): 

514 self.logger.info("Sending isOnboarded") 

515 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD) 

516 is_onboard = apdu_rcv[1] == 1 

517 self.logger.info("isOnboarded: %s", "yes" if is_onboard else "no") 

518 return is_onboard 

519 

520 # Attempt to onboard the device using the given seed and pin 

521 # Return the generated backup 

522 def onboard(self, seed, pin): 

523 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH: 

524 raise HSM2DongleError("Invalid seed given") 

525 

526 self.logger.info("Sending seed") 

527 for i, b in enumerate(seed): 

528 self._send_command(self.CMD.SEED, bytes([i, b])) 

529 

530 self.logger.info("Sending pin") 

531 self._send_pin(pin, True) 

532 

533 self.logger.info("Sending wipe") 

534 apdu_rcv = self._send_command(self.CMD.WIPE, timeout=self.ONBOARDING.TIMEOUT) 

535 

536 if apdu_rcv[1] != 2: 

537 raise HSM2DongleError("Error onboarding. Got '%s'" % apdu_rcv.hex()) 

538 

539 return True 

540 

541 # send PIN to device, optionally prepending its length 

542 def _send_pin(self, pin, prepend_length): 

543 final_pin = pin 

544 if prepend_length: 

545 final_pin = bytes([len(pin)]) + final_pin 

546 

547 for i in range(len(final_pin)): 

548 self._send_command(self.CMD.SEND_PIN, bytes([i, final_pin[i]])) 

549 

550 # unlock the device with the PIN sent 

551 def unlock(self, pin): 

552 # Send the pin, then send the unlock command per se 

553 self._send_pin(pin, prepend_length=False) 

554 apdu_rcv = self._send_command(self.CMD.UNLOCK, bytes([0x00, 0x00])) 

555 

556 # Zero indicates wrong pin. Nonzero indicates device unlocked 

557 return apdu_rcv[2] != 0 

558 

559 # replace PIN with a new one 

560 def new_pin(self, pin): 

561 try: 

562 # Send the pin, then send the replace command per se 

563 self._send_pin(pin, prepend_length=True) 

564 self._send_command(self.CMD.CHANGE_PIN) 

565 # All is good 

566 return True 

567 except HSM2DongleErrorResult as e: 

568 # Tried to set an invalid pin 

569 if e.error_code == self.ERR.UI.INVALID_PIN: 

570 return False 

571 # Something else happened 

572 raise e 

573 

574 # returns an instance of HSM2FirmwareVersion representing 

575 # the version of the currently running firmware on the HSM2 

576 # that is connected (i.e., could be either the signer or ui) 

577 def get_version(self): 

578 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD) 

579 return HSM2FirmwareVersion(apdu_rcv[2], apdu_rcv[3], apdu_rcv[4]) 

580 

581 # returns the number of pin retries available 

582 def get_retries(self): 

583 apdu_rcv = self._send_command(self.CMD.RETRIES) 

584 return apdu_rcv[2] 

585 

586 # returns an instance of HSM2FirmwareParameters representing 

587 # the parameters of the currently running firmware on the HSM2 

588 # that is connected (it should be running the signer). 

589 def get_signer_parameters(self): 

590 try: 

591 apdu_rcv = self._send_command(self.CMD.GET_PARAMETERS) 

592 return HSM2FirmwareParameters.from_dongle_format(apdu_rcv[self.OFF.DATA:]) 

593 except ValueError as e: 

594 msg = "While getting signer firmware parameters: %s" % str(e) 

595 self.logger.error(msg) 

596 raise HSM2DongleError(msg) 

597 

598 # exit the ledger nano S menu 

599 def exit_menu(self, autoexec=True): 

600 self._send_command( 

601 self.CMD.EXIT_MENU if autoexec else self.CMD.EXIT_MENU_NO_AUTOEXEC, 

602 bytes([0x00, 0x00]), 

603 ) 

604 

605 # exit the current app 

606 # could either be the UI bootloader, UI heartbeat or Signer 

607 def exit_app(self): 

608 self._send_command(self.CMD.EXIT_MENU) 

609 

610 # get the public key for a bip32 path 

611 # key_id: BIP32Path 

612 def get_public_key(self, key_id): 

613 publicKey = self._send_command(self.CMD.GET_PUBLIC_KEY, key_id.to_binary()) 

614 return publicKey.hex() 

615 

616 # Ask the device to sign a specific input of a given unsigned bitcoin transaction 

617 # using the given RSK transaction receipt as an authorization for the signature. 

618 # key_id: BIP32Path 

619 # rsk_tx_receipt: hex string 

620 # btc_tx: hex string 

621 # receipt_merkle_proof: list 

622 # input_index: int 

623 # sighash_computation_mode: a SighashComputationMode instance 

624 # witness_script: hex string 

625 # outpoint_value: int 

626 def sign_authorized( 

627 self, key_id, rsk_tx_receipt, receipt_merkle_proof, btc_tx, input_index, 

628 sighash_computation_mode, witness_script, outpoint_value 

629 ): 

630 # *** Signing protocol *** 

631 # The order in which things are required and then sent is: 

632 # 1. BIP32 path & BTC tx input index (single message) 

633 # 2. BTC transaction (several messages, as required by ledger) 

634 # 3. RSK transaction receipt (several messages, as required by ledger) 

635 # 4. RSK Tx receipt merkle proof (several messages, as required by ledger) 

636 # (Note: in theory, one could depend only on the order in which 

637 # the ledger requires data and send it blindly. In practice, 

638 # we know exactly the order in which the device will require 

639 # the data, and use that order to validate it as we go.) 

640 # 

641 # During these exchanges, an exception can be raised at any moment, which 

642 # would signal failure signing. 

643 # Specific error codes come with HSM2DongleErrorResult 

644 # exception instances and are handled accordingly. Anything else 

645 # is treated as an unexpected error and is let for the calling layer 

646 # to handle. 

647 

648 # Step 1. Send path and input index 

649 key_id_bytes = key_id.to_binary() 

650 input_index_bytes = input_index.to_bytes(4, byteorder="little", signed=False) 

651 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + input_index_bytes 

652 try: 

653 self.logger.debug("Sign: sending path - %s", data.hex()) 

654 response = self._send_command(self.CMD.SIGN, data) 

655 

656 # We expect the device to ask for the BTC tx next. 

657 # If this doesn't happen, error out 

658 if response[self.OFF.OP] != self.OP.SIGN.BTC_TX: 

659 self.logger.error("Sign: unexpected response %s", response.hex()) 

660 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

661 

662 # How many bytes to send in the next message 

663 bytes_requested = response[self.OFF.DATA] 

664 except HSM2DongleErrorResult as e: 

665 self.logger.error("Sign returned: %s", hex(e.error_code)) 

666 if e.error_code in [ 

667 self.ERR.SIGN.DATA_SIZE, 

668 self.ERR.SIGN.DATA_SIZE_AUTH, 

669 self.ERR.SIGN.DATA_SIZE_NOAUTH, 

670 ]: 

671 return (False, self.RESPONSE.SIGN.ERROR_PATH) 

672 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

673 

674 # Step 2. Send BTC transaction and extra data 

675 # Prefix the BTC transaction with the total length of the payload encoded as a 

676 # 4 bytes little endian unsigned integer. The total length should include 

677 # those 4 bytes plus 2 bytes for the length of the extradata and 1 byte for 

678 # the sighash computation mode 

679 try: 

680 PAYLOADLENGTH_LENGTH = 4 

681 SIGHASH_COMPUTATION_MODE_LENGTH = 1 

682 EXTRADATALENGTH_LENGTH = 2 

683 OUTPOINT_VALUE_LENGTH = 8 

684 

685 btc_tx_bytes = bytes.fromhex(btc_tx) 

686 

687 scm_bytes = sighash_computation_mode.netvalue.to_bytes( 

688 SIGHASH_COMPUTATION_MODE_LENGTH, 

689 byteorder='little', signed=False 

690 ) 

691 

692 ed_bytes = b"" 

693 

694 if sighash_computation_mode == SighashComputationMode.SEGWIT: 

695 ov_bytes = outpoint_value.to_bytes( 

696 OUTPOINT_VALUE_LENGTH, 

697 byteorder='little', signed=False 

698 ) 

699 

700 ws_bytes = bytes.fromhex(witness_script) 

701 ws_length_bytes = bytes.fromhex(encode_varint(len(ws_bytes))) 

702 

703 ed_bytes = ws_length_bytes + ws_bytes + ov_bytes 

704 

705 edl_bytes = len(ed_bytes).to_bytes( 

706 EXTRADATALENGTH_LENGTH, 

707 byteorder='little', signed=False 

708 ) 

709 

710 payload_length = \ 

711 PAYLOADLENGTH_LENGTH + \ 

712 SIGHASH_COMPUTATION_MODE_LENGTH + \ 

713 EXTRADATALENGTH_LENGTH + \ 

714 len(btc_tx_bytes) 

715 

716 payload_length_bytes = payload_length.to_bytes( 

717 PAYLOADLENGTH_LENGTH, byteorder="little", signed=False 

718 ) 

719 

720 data = payload_length_bytes + scm_bytes + edl_bytes + btc_tx_bytes + ed_bytes 

721 

722 response = self._send_data_in_chunks( 

723 command=self.CMD.SIGN, 

724 operation=self.OP.SIGN.BTC_TX, 

725 next_operations=[self.OP.SIGN.TX_RECEIPT], 

726 data=data, 

727 expect_full_data=True, 

728 initial_bytes=bytes_requested, 

729 operation_name="sign", 

730 data_description="BTC tx", 

731 ) 

732 

733 if not response[0]: 

734 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

735 

736 bytes_requested = response[1][self.OFF.DATA] 

737 except HSM2DongleErrorResult as e: 

738 self.logger.error("Sign returned: %s", hex(e.error_code)) 

739 if e.error_code in [ 

740 self.ERR.SIGN.INPUT, 

741 self.ERR.SIGN.DATA_SIZE, 

742 self.ERR.SIGN.TX_HASH_MISMATCH, 

743 self.ERR.SIGN.TX_VERSION, 

744 self.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE, 

745 self.ERR.SIGN.INVALID_EXTRADATA_SIZE, 

746 ]: 

747 return (False, self.RESPONSE.SIGN.ERROR_BTC_TX) 

748 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

749 

750 # Step 3. Send transaction receipt 

751 try: 

752 response = self._send_data_in_chunks( 

753 command=self.CMD.SIGN, 

754 operation=self.OP.SIGN.TX_RECEIPT, 

755 next_operations=[self.OP.SIGN.MERKLE_PROOF], 

756 data=bytes.fromhex(rsk_tx_receipt), 

757 expect_full_data=True, 

758 initial_bytes=bytes_requested, 

759 operation_name="sign", 

760 data_description="tx receipt", 

761 ) 

762 

763 if not response[0]: 

764 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

765 

766 bytes_requested = response[1][self.OFF.DATA] 

767 except HSM2DongleErrorResult as e: 

768 self.logger.error("Sign returned: %s", hex(e.error_code)) 

769 if e.error_code in [ 

770 self.ERR.SIGN.STATE, 

771 self.ERR.SIGN.RLP, 

772 self.ERR.SIGN.RLP_INT, 

773 self.ERR.SIGN.RLP_DEPTH, 

774 self.ERR.SIGN.DATA_SIZE, 

775 ]: 

776 return (False, self.RESPONSE.SIGN.ERROR_TX_RECEIPT) 

777 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

778 

779 # Step 4. Send tx receipt merkle proof 

780 # The format for the receipts merkle proof is as follows: 

781 # 1 byte for the number of nodes 

782 # For each node: 1 byte for the node length + the node bytes. 

783 try: 

784 if len(receipt_merkle_proof) > 255: 

785 raise ValueError("Too many nodes") 

786 

787 merkle_proof_bytes = bytes([len(receipt_merkle_proof)]) 

788 for node in receipt_merkle_proof: 

789 node_bytes = bytes.fromhex(node) 

790 if len(node_bytes) > 255: 

791 raise ValueError("Node too big: %s" % node) 

792 merkle_proof_bytes = ( 

793 merkle_proof_bytes + bytes([len(node_bytes)]) + node_bytes 

794 ) 

795 except ValueError as e: 

796 self.logger.error("Sign: invalid receipts merkle proof: %s", str(e)) 

797 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF) 

798 

799 try: 

800 response = self._send_data_in_chunks( 

801 command=self.CMD.SIGN, 

802 operation=self.OP.SIGN.MERKLE_PROOF, 

803 next_operations=[self.OP.SIGN.SUCCESS], 

804 data=merkle_proof_bytes, 

805 expect_full_data=True, 

806 initial_bytes=bytes_requested, 

807 operation_name="sign", 

808 data_description="receipts merkle proof", 

809 ) 

810 

811 if not response[0]: 

812 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

813 except HSM2DongleErrorResult as e: 

814 self.logger.error("Sign returned: %s", hex(e.error_code)) 

815 if e.error_code in [ 

816 self.ERR.SIGN.DATA_SIZE, 

817 self.ERR.SIGN.STATE, 

818 self.ERR.SIGN.NODE_VERSION, 

819 self.ERR.SIGN.SHARED_PREFIX_TOO_BIG, 

820 self.ERR.SIGN.RECEIPT_HASH_MISMATCH, 

821 self.ERR.SIGN.NODE_CHAINING_MISMATCH, 

822 self.ERR.SIGN.RECEIPT_ROOT_MISMATCH, 

823 ]: 

824 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF) 

825 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

826 

827 # If we get here, we should have a signature in the data part. 

828 # Return success along with it. 

829 try: 

830 return (True, HSM2DongleSignature(response[1][self.OFF.DATA:])) 

831 except Exception as e: 

832 self.logger.error("Error parsing signature: %s", str(e)) 

833 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

834 

835 # Ask the device to sign a specific hash without any authorization. 

836 # key_id: BIP32Path 

837 # hash: hex string 

838 def sign_unauthorized(self, key_id, hash): 

839 # *** Signing protocol *** 

840 # This signing method requires a single message that contains the 

841 # BIP32 path & hash to sign. 

842 # 

843 # An exception can be raised, which 

844 # would signal failure signing. Specific error codes 

845 # come with HSM2DongleErrorResult 

846 # exception instances and are handled accordingly. Anything else 

847 # is treated as an unexpected error and is let for the calling layer 

848 # to handle. 

849 

850 try: 

851 hash_bytes = bytes.fromhex(hash) 

852 except ValueError: 

853 self.logger.error("Sign: invalid hash - %s", hash) 

854 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

855 

856 # Send path and hash to sign 

857 try: 

858 key_id_bytes = key_id.to_binary() 

859 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + hash_bytes 

860 self.logger.debug("Sign: sending path and hash - %s", data.hex()) 

861 response = self._send_command(self.CMD.SIGN, data) 

862 

863 # Special case: if the device asks for a BTC transaction, then 

864 # there's a case of both invalid path and invalid hash. Report invalid hash 

865 if response[self.OFF.OP] == self.OP.SIGN.BTC_TX: 

866 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

867 

868 # We expect the device to report success signing 

869 # If this doesn't happen, error out 

870 if response[self.OFF.OP] != self.OP.SIGN.SUCCESS: 

871 self.logger.error("Sign: unexpected response %s", response.hex()) 

872 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

873 except HSM2DongleErrorResult as e: 

874 self.logger.error("Sign returned: %s", hex(e.error_code)) 

875 if e.error_code in [self.ERR.SIGN.DATA_SIZE, self.ERR.SIGN.DATA_SIZE_NOAUTH]: 

876 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

877 elif e.error_code in [ 

878 self.ERR.SIGN.INVALID_PATH, 

879 self.ERR.SIGN.DATA_SIZE_AUTH, 

880 ]: 

881 return (False, self.RESPONSE.SIGN.ERROR_PATH) 

882 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

883 

884 # If we get here, we should have a signature in the data part. 

885 # Return success along with it. 

886 try: 

887 return (True, HSM2DongleSignature(response[self.OFF.DATA:])) 

888 except Exception as e: 

889 self.logger.error("Error parsing signature: %s", str(e)) 

890 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

891 

892 def get_blockchain_state(self): 

893 state = {} 

894 

895 # Get hashes 

896 for (key, hash_cmd) in self.GST.HASH_VALUES.items(): 

897 self.logger.info("Getting hash value for '%s'", key) 

898 result = self._send_command( 

899 self.CMD.GET_STATE, bytes([self.OP.GST.HASH, hash_cmd]) 

900 ) 

901 

902 # Validate result 

903 if ( 

904 result[self.OFF.OP] != self.OP.GST.HASH 

905 or result[self.OFF.DATA] != hash_cmd 

906 or len(result[self.OFF.DATA + 1:]) != self.HASH_SIZE 

907 ): 

908 msg = "Invalid response for hash: %s" % result.hex() 

909 self.logger.error(msg) 

910 raise HSM2DongleError(msg) 

911 

912 state[key] = result[self.OFF.DATA + 1:].hex() 

913 

914 # Get difficulty 

915 self.logger.info("Getting difficulty") 

916 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.DIFF])) 

917 if result[self.OFF.OP] != self.OP.GST.DIFF: 

918 msg = "Invalid response for difficulty: %s" % result.hex() 

919 self.logger.error(msg) 

920 raise HSM2DongleError(msg) 

921 

922 state["updating.total_difficulty"] = int.from_bytes( 

923 result[self.OFF.DATA:], byteorder="big", signed=False 

924 ) 

925 

926 # Get flags 

927 self.logger.info("Getting flags") 

928 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.FLAGS])) 

929 if result[self.OFF.OP] != self.OP.GST.FLAGS or len(result[self.OFF.DATA:]) != 3: 

930 msg = "Invalid response for flags: %s" % result.hex() 

931 self.logger.error(msg) 

932 raise HSM2DongleError(msg) 

933 

934 state["updating.in_progress"] = bool( 

935 result[self.OFF.DATA + self.GST.FLAG_OFFSET.IN_PROGRESS] 

936 ) 

937 state["updating.already_validated"] = bool( 

938 result[self.OFF.DATA + self.GST.FLAG_OFFSET.ALREADY_VALIDATED] 

939 ) 

940 state["updating.found_best_block"] = bool( 

941 result[self.OFF.DATA + self.GST.FLAG_OFFSET.FOUND_BEST_BLOCK] 

942 ) 

943 

944 return state 

945 

946 def reset_advance_blockchain(self): 

947 self.logger.info("Resetting advance blockchain") 

948 result = self._send_command(self.CMD.RESET_AB, bytes([self.OP.RAV.INIT])) 

949 if result[self.OFF.OP] != self.OP.RAV.DONE: 

950 msg = "Invalid response for reset advance blockchain: %s" % result.hex() 

951 self.logger.error(msg) 

952 raise HSM2DongleError(msg) 

953 

954 return True 

955 

956 # Ask the device to update its blockchain references by processing 

957 # a given set of blocks and their brothers. 

958 # blocks: list of hex strings 

959 # (each hex string is a raw block header, 

960 # which should *always* include merge mining fields) 

961 # brothers: list of list of hex strings 

962 # (each list of hex strings is the block's brothers' headers 

963 # for the corresponding block header in the same position 

964 # of the blocks list) 

965 def advance_blockchain(self, blocks, brothers): 

966 # Convenient shorthands 

967 err = self.ERR.ADVANCE 

968 response = self.RESPONSE.ADVANCE 

969 

970 # Sort each group of brothers by block hash 

971 brothers = list(map(lambda brolist: 

972 sorted(brolist, 

973 key=lambda bh: bytes.fromhex(get_block_hash(bh)) 

974 ), 

975 brothers) 

976 ) 

977 

978 return self._do_block_operation( 

979 "advance", 

980 blocks, 

981 brothers, 

982 self.CMD.ADVANCE, 

983 self.OP.ADVANCE, 

984 err, 

985 response, 

986 { 

987 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK, 

988 err.MERKLE_PROOF_OVERFLOW: response.ERROR_INVALID_BLOCK, 

989 err.CB_TXN_OVERFLOW: response.ERROR_INVALID_BLOCK, 

990 err.RLP_INVALID: response.ERROR_INVALID_BLOCK, 

991 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK, 

992 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK, 

993 err.UMM_ROOT_INVALID: response.ERROR_INVALID_BLOCK, 

994 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK, 

995 err.MERKLE_PROOF_INVALID: response.ERROR_INVALID_BLOCK, 

996 err.BLOCK_DIFF_INVALID: response.ERROR_INVALID_BLOCK, 

997 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK, 

998 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK, 

999 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK, 

1000 err.MERKLE_PROOF_MISMATCH: response.ERROR_POW_INVALID, 

1001 err.BTC_CB_TXN_INVALID: response.ERROR_POW_INVALID, 

1002 err.MM_HASH_MISMATCH: response.ERROR_POW_INVALID, 

1003 err.BTC_DIFF_MISMATCH: response.ERROR_POW_INVALID, 

1004 err.CB_TXN_HASH_MISMATCH: response.ERROR_POW_INVALID, 

1005 err.BROTHERS_TOO_MANY: response.ERROR_INVALID_BROTHERS, 

1006 err.BROTHER_PARENT_MISMATCH: response.ERROR_INVALID_BROTHERS, 

1007 err.BROTHER_SAME_AS_BLOCK: response.ERROR_INVALID_BROTHERS, 

1008 err.BROTHER_ORDER_INVALID: response.ERROR_INVALID_BROTHERS, 

1009 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH, 

1010 err.TOTAL_DIFF_OVERFLOW: response.ERROR_UNSUPPORTED_CHAIN, 

1011 err.PROT_INVALID: response.ERROR_BLOCK_DATA, 

1012 }, 

1013 ) 

1014 

1015 # Ask the device to update its ancestor block and ancestor receipts root 

1016 # references by processing a given set of blocks. 

1017 # blocks: list of hex strings 

1018 # (each hex string is a raw block header, 

1019 # which doesn't need to include merge mining fields - 

1020 # those will be stripped for efficiency before being sent 

1021 # to the device anyway) 

1022 def update_ancestor(self, blocks): 

1023 # Convenient shorthands 

1024 err = self.ERR.UPD_ANCESTOR 

1025 response = self.RESPONSE.UPD_ANCESTOR 

1026 

1027 # Optimization: remove merge mining fields (if present) from blocks 

1028 try: 

1029 self.logger.info("Removing merge mining fields from %d blocks", len(blocks)) 

1030 optimized_blocks = list(map(remove_mm_fields_if_present, blocks)) 

1031 except ValueError as e: 

1032 self.logger.error("While removing merge mining fields: %s", str(e)) 

1033 return (False, response.ERROR_REMOVE_MM_FIELDS) 

1034 

1035 return self._do_block_operation( 

1036 "updancestor", 

1037 optimized_blocks, 

1038 None, 

1039 self.CMD.UPD_ANCESTOR, 

1040 self.OP.UPD_ANCESTOR, 

1041 err, 

1042 response, 

1043 { 

1044 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK, 

1045 err.RLP_INVALID: response.ERROR_INVALID_BLOCK, 

1046 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK, 

1047 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK, 

1048 err.RECEIPT_ROOT_INVALID: response.ERROR_INVALID_BLOCK, 

1049 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK, 

1050 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK, 

1051 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK, 

1052 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK, 

1053 err.ANCESTOR_TIP_MISMATCH: response.ERROR_TIP_MISMATCH, 

1054 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH, 

1055 err.PROT_INVALID: response.ERROR_BLOCK_DATA, 

1056 }, 

1057 ) 

1058 

1059 def get_ui_attestation(self, ud_value_hex): 

1060 # Parse hexadecimal values 

1061 ud_value = bytes.fromhex(ud_value_hex) 

1062 

1063 # Get UI hash 

1064 ui_hash = self._send_command( 

1065 self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_APP_HASH]) 

1066 )[self.OFF.DATA:] 

1067 

1068 # Send UD value 

1069 data = bytes([self.OP.UI_ATT.OP_UD_VALUE]) + ud_value 

1070 self._send_command(self.CMD.UI_ATT, data) 

1071 

1072 # Retrieve message 

1073 page = 0 

1074 message = b"" 

1075 while True: 

1076 if page == self.MAX_PAGES_UI_ATT_MESSAGE: 

1077 msg = ( 

1078 "Maximum number of UI attestation pages exceeded ()" 

1079 % self.MAX_PAGES_UI_ATT_MESSAGE 

1080 ) 

1081 self.logger.error(msg) 

1082 raise HSM2DongleError(msg) 

1083 data = bytes([self.OP.UI_ATT.OP_GET_MSG, page]) 

1084 response = self._send_command(self.CMD.UI_ATT, data) 

1085 page += 1 

1086 message += response[self.OFF.DATA + 1:] 

1087 if response[self.OFF.DATA] == 0: 

1088 break 

1089 

1090 # Retrieve attestation 

1091 attestation = self._send_command(self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_GET]))[self.OFF.DATA:] # noqa E501 

1092 

1093 return { 

1094 "app_hash": ui_hash.hex(), 

1095 "message": message.hex(), 

1096 "signature": attestation.hex(), 

1097 } 

1098 

1099 def get_powhsm_attestation(self, ud_value_hex): 

1100 return PowHsmAttestation(self).run(ud_value_hex) 

1101 

1102 def get_signer_heartbeat(self, ud_value): 

1103 return HSM2SignerHeartbeat(self).run(ud_value) 

1104 

1105 def get_ui_heartbeat(self, ud_value): 

1106 return HSM2UIHeartbeat(self).run(ud_value) 

1107 

1108 def authorize_signer(self, signer_authorization): 

1109 # Send signer version 

1110 self._send_command(self.CMD.SIGNER_AUTH, 

1111 bytes([self.OP.SIGNER_AUTH.OP_SIGVER]) + 

1112 bytes.fromhex(signer_authorization.signer_version.hash) + 

1113 signer_authorization.signer_version.iteration.to_bytes( 

1114 self.SIGNER_AUTH_ITERATION_SIZE, 

1115 byteorder='big', signed=False)) 

1116 

1117 # Send signatures one by one 

1118 result = None 

1119 for signature in signer_authorization.signatures: 

1120 result = self._send_command(self.CMD.SIGNER_AUTH, 

1121 bytes([self.OP.SIGNER_AUTH.OP_SIGN]) + 

1122 bytes.fromhex(signature))[self.OFF.DATA] 

1123 # Are we done? 

1124 if result == self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS: 

1125 return True 

1126 

1127 # Are we not done after all signatures were sent? 

1128 if result != self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS: 

1129 raise HSM2DongleError("Not enough signatures given. " 

1130 "Signer authorization failed") 

1131 

1132 return True 

1133 

1134 # Used both for advance blockchain and update ancestor given the protocol 

1135 # is very similar 

1136 def _do_block_operation( 

1137 self, 

1138 operation_name, 

1139 blocks, 

1140 brothers, 

1141 command, 

1142 ops, 

1143 errors, 

1144 responses, 

1145 chunk_error_mapping, 

1146 ): 

1147 # *** Block operation protocol *** 

1148 # The order in which things are required and then sent is: 

1149 # 1. Initialization, where the total number of blocks to send is sent. 

1150 # 2. For each block header: 

1151 # 2.1. Block metadata (single message): 

1152 # - MM payload size in bytes 

1153 # (see the block_utils.rlp_mm_payload_size method for details on this) 

1154 # - In case of an advance blockchain operation, 

1155 # coinbase transaction hash (see the block_utils.coinbase_tx_get_hash 

1156 # for details on this) 

1157 # 2.2. Block chunks: block header pieces as requested by the ledger. 

1158 # 2.3. Brothers -- only for advance blockchain: 

1159 # 2.3.1 Brothers metadata (single message): 

1160 # - Brother count 

1161 # 2.3.2 For each brother (if brother count was greater than zero): 

1162 # 2.3.2.1. Brother metadata (single message): 

1163 # - MM payload size in bytes 

1164 # (see the block_utils.rlp_mm_payload_size method for details on this) 

1165 # - Coinbase transaction hash (see the block_utils.coinbase_tx_get_hash 

1166 # for details on this) 

1167 # 2.3.2.2. Brother chunks: brother header pieces as requested by the ledger. 

1168 # 

1169 # During these exchanges, an exception can be raised at any moment, which 

1170 # would signal failure. Specific error codes come with HSM2DongleErrorResult 

1171 # exception instances and are handled accordingly. Anything else 

1172 # is treated as an unexpected error and is let for the calling layer 

1173 # to handle. 

1174 

1175 # Step 1. Send initialization 

1176 num_blocks_bytes = len(blocks).to_bytes(4, byteorder="big", signed=False) 

1177 data = bytes([ops.INIT]) + num_blocks_bytes 

1178 try: 

1179 self.logger.info( 

1180 "%s: sending initialization - %s", operation_name.capitalize(), data.hex() 

1181 ) 

1182 response = self._send_command(command, data) 

1183 

1184 # We expect the device to ask for block metadata next. 

1185 # If this doesn't happen, error out 

1186 if response[self.OFF.OP] != ops.HEADER_META: 

1187 self.logger.error( 

1188 "%s: unexpected response %s", 

1189 operation_name.capitalize(), 

1190 response.hex(), 

1191 ) 

1192 return (False, responses.ERROR_UNEXPECTED) 

1193 except HSM2DongleErrorResult as e: 

1194 self.logger.error( 

1195 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1196 ) 

1197 if e.error_code in [errors.PROT_INVALID]: 

1198 return (False, responses.ERROR_INIT) 

1199 return (False, responses.ERROR_UNEXPECTED) 

1200 

1201 # Step 2. Send blocks (and brothers, if any) 

1202 total_blocks = len(blocks) 

1203 for block_number, block in enumerate(blocks, 1): 

1204 self.logger.info( 

1205 "%s: sending block #%d/%d", 

1206 operation_name.capitalize(), 

1207 block_number, 

1208 total_blocks, 

1209 ) 

1210 

1211 response = self._send_block_header( 

1212 operation_name=operation_name, 

1213 header_name="block", 

1214 block=block, 

1215 command=command, 

1216 ops=ops, 

1217 op_meta=ops.HEADER_META, 

1218 op_chunk=ops.HEADER_CHUNK, 

1219 responses=responses, 

1220 errors=errors, 

1221 chunk_error_mapping=chunk_error_mapping 

1222 ) 

1223 if not response[0]: 

1224 return response 

1225 

1226 # Step 2.3. Send brothers 

1227 # *** Only for advance blockchain and if requested by the dongle *** 

1228 if command == self.CMD.ADVANCE and \ 

1229 response[1][self.OFF.OP] == ops.BROTHER_LIST_META: 

1230 

1231 # Step 2.3.1. Send brother list metadata 

1232 brother_list = brothers[block_number-1] 

1233 brother_count = len(brother_list) 

1234 brother_count_bytes = brother_count.to_bytes(1, 

1235 byteorder="big", 

1236 signed=False) 

1237 data = bytes([ops.BROTHER_LIST_META]) + brother_count_bytes 

1238 try: 

1239 self.logger.info( 

1240 "%s: sending brother list metadata - %s", 

1241 operation_name.capitalize(), data.hex() 

1242 ) 

1243 response = [None, self._send_command(command, data)] 

1244 

1245 # If we have at least one brother, 

1246 # we expect the device to ask for brother metadata next. 

1247 # If this doesn't happen, error out 

1248 if brother_count > 0 and response[1][self.OFF.OP] != ops.BROTHER_META: 

1249 self.logger.error( 

1250 "%s: unexpected response %s", 

1251 operation_name.capitalize(), 

1252 response[1].hex(), 

1253 ) 

1254 return (False, responses.ERROR_UNEXPECTED) 

1255 except HSM2DongleErrorResult as e: 

1256 self.logger.error( 

1257 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1258 ) 

1259 if e.error_code in [errors.PROT_INVALID, errors.BROTHERS_TOO_MANY]: 

1260 return (False, responses.ERROR_INVALID_BROTHERS) 

1261 return (False, responses.ERROR_UNEXPECTED) 

1262 

1263 # Step 2.3.2. Send each brother 

1264 for brother_number, brother in enumerate(brother_list, 1): 

1265 self.logger.info( 

1266 "%s: sending brother #%d/%d", 

1267 operation_name.capitalize(), 

1268 brother_number, 

1269 brother_count, 

1270 ) 

1271 

1272 response = self._send_block_header( 

1273 operation_name=operation_name, 

1274 header_name="brother", 

1275 block=brother, 

1276 command=command, 

1277 ops=ops, 

1278 op_meta=ops.BROTHER_META, 

1279 op_chunk=ops.BROTHER_CHUNK, 

1280 responses=responses, 

1281 errors=errors, 

1282 chunk_error_mapping=chunk_error_mapping 

1283 ) 

1284 if not response[0]: 

1285 return response 

1286 

1287 # Partial success? 

1288 if command == self.CMD.ADVANCE and response[1][self.OFF.OP] == ops.PARTIAL: 

1289 self.logger.info("%s: partial success", operation_name.capitalize()) 

1290 return (True, responses.OK_PARTIAL) 

1291 

1292 # Success? 

1293 if response[1][self.OFF.OP] == ops.SUCCESS: 

1294 self.logger.info("%s: total success", operation_name.capitalize()) 

1295 return (True, responses.OK_TOTAL) 

1296 

1297 # We shouldn't be able to ever reach this point 

1298 msg = "%s: unexpected state" % operation_name.capitalize() 

1299 self.logger.fatal(msg) 

1300 raise HSM2DongleError(msg) 

1301 

1302 # Send an individual block header to the device, including computing 

1303 # and sending metadata 

1304 # This is used both for advance blockchain (block and brother headers) 

1305 # and update ancestor given the protocol is very similar 

1306 def _send_block_header( 

1307 self, 

1308 operation_name, 

1309 header_name, 

1310 block, 

1311 command, 

1312 ops, 

1313 op_meta, 

1314 op_chunk, 

1315 responses, 

1316 errors, 

1317 chunk_error_mapping 

1318 ): 

1319 # A. Compute and send block metadata 

1320 # (this will also validate that the block is a valid RLP-encoded list 

1321 # of the proper size) 

1322 try: 

1323 # RLP payload size for merge mining hash 

1324 mm_payload_size = rlp_mm_payload_size(block) 

1325 self.logger.debug( 

1326 "%s metadata: MM payload length %d", 

1327 header_name.capitalize(), 

1328 mm_payload_size) 

1329 mm_payload_size_bytes = mm_payload_size.to_bytes( 

1330 2, byteorder="big", signed=False 

1331 ) 

1332 # Coinbase transaction hash 

1333 cb_txn_hash = bytes([]) 

1334 if command == self.CMD.ADVANCE: 

1335 cb_txn_hash = bytes.fromhex( 

1336 coinbase_tx_get_hash(get_coinbase_txn(block)) 

1337 ) 

1338 self.logger.debug( 

1339 "%s Metadata: CB txn hash: %s", 

1340 header_name.capitalize(), 

1341 cb_txn_hash.hex()) 

1342 # Wrap and send 

1343 data = bytes([op_meta]) + mm_payload_size_bytes + cb_txn_hash 

1344 self.logger.info( 

1345 "%s: sending %s metadata - %s", 

1346 operation_name.capitalize(), 

1347 header_name, 

1348 data.hex() 

1349 ) 

1350 response = self._send_command(command, data) 

1351 

1352 # We expect the device to ask for a block chunk next. 

1353 # If this doesn't happen, error out 

1354 if response[self.OFF.OP] != op_chunk: 

1355 self.logger.error( 

1356 "%s: unexpected response %s", 

1357 operation_name.capitalize(), 

1358 response.hex(), 

1359 ) 

1360 return (False, responses.ERROR_UNEXPECTED) 

1361 

1362 # How many bytes to send as the first block chunk 

1363 bytes_requested = response[self.OFF.DATA] 

1364 except ValueError as e: 

1365 self.logger.error("Computing %s metadata: %s", header_name, str(e)) 

1366 return (False, responses.ERROR_COMPUTE_METADATA) 

1367 except HSM2DongleErrorResult as e: 

1368 self.logger.error( 

1369 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1370 ) 

1371 if e.error_code in [errors.PROT_INVALID]: 

1372 return (False, responses.ERROR_METADATA) 

1373 return (False, responses.ERROR_UNEXPECTED) 

1374 

1375 # B. Send block data in chunks 

1376 try: 

1377 # Next possible operations depending on the specific command 

1378 # and type of header we're sending 

1379 next_operations = [op_chunk, op_meta, ops.SUCCESS] 

1380 if command == self.CMD.ADVANCE: 

1381 next_operations.append(ops.PARTIAL) 

1382 if header_name == "block": 

1383 next_operations.append(ops.BROTHER_LIST_META) 

1384 if header_name == "brother": 

1385 next_operations.append(ops.HEADER_META) 

1386 

1387 response = self._send_data_in_chunks( 

1388 command=command, 

1389 operation=op_chunk, 

1390 next_operations=next_operations, 

1391 data=bytes.fromhex(block), 

1392 expect_full_data=False, 

1393 initial_bytes=bytes_requested, 

1394 operation_name=operation_name, 

1395 data_description=header_name, 

1396 ) 

1397 

1398 if not response[0]: 

1399 return (False, responses.ERROR_UNEXPECTED) 

1400 except HSM2DongleErrorResult as e: 

1401 self.logger.error( 

1402 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1403 ) 

1404 return ( 

1405 False, 

1406 chunk_error_mapping.get(e.error_code, responses.ERROR_UNEXPECTED), 

1407 ) 

1408 

1409 return response 

1410 

1411 # Send a specific piece of data in chunks to the device 

1412 # as the device requests bytes from it. 

1413 # Validate responses wrt current operation and next possible expected operations 

1414 # Exceptions are to be handled by the caller 

1415 def _send_data_in_chunks( 

1416 self, 

1417 command, 

1418 operation, 

1419 next_operations, 

1420 data, 

1421 expect_full_data, 

1422 initial_bytes, 

1423 operation_name, 

1424 data_description, 

1425 ): 

1426 offset = 0 

1427 bytes_requested = initial_bytes 

1428 total_bytes_sent = 0 

1429 finished = False 

1430 while not finished: 

1431 to_send = data[offset:offset + bytes_requested] 

1432 to_send_length = len(to_send) 

1433 self.logger.debug( 

1434 "%s: sending %s chunk [%d:%d] - %s", 

1435 operation_name.capitalize(), 

1436 data_description, 

1437 offset, 

1438 offset + to_send_length, 

1439 to_send.hex(), 

1440 ) 

1441 response = self._send_command(command, bytes([operation]) + to_send) 

1442 

1443 # Increase count and buffer pointer 

1444 total_bytes_sent += to_send_length 

1445 offset += to_send_length 

1446 

1447 # We expect the device to either ask for the current or for the 

1448 # next operation. 

1449 # If none of this happens, error out 

1450 if response[self.OFF.OP] not in ([operation] + next_operations): 

1451 self.logger.debug( 

1452 "Current operation %s, next operations %s, ledger requesting %s", 

1453 hex(operation), 

1454 str(list(map(hex, next_operations))), 

1455 hex(response[2]), 

1456 ) 

1457 self.logger.error( 

1458 "%s: unexpected response %s", 

1459 operation_name.capitalize(), 

1460 response.hex(), 

1461 ) 

1462 return (False, response) 

1463 

1464 # We finish when the device requests the next piece of data 

1465 finished = response[self.OFF.OP] != operation 

1466 

1467 # Have we finished but not sent all data when required to do so? 

1468 if expect_full_data and finished and total_bytes_sent < len(data): 

1469 self.logger.error( 

1470 "%s: expected to send all %d data bytes but sent %d and got %s", 

1471 operation_name.capitalize(), 

1472 len(data), 

1473 total_bytes_sent, 

1474 response.hex() 

1475 ) 

1476 return (False, response) 

1477 

1478 # How many bytes to send in the next message 

1479 if not finished: 

1480 bytes_requested = response[self.OFF.DATA] 

1481 self.logger.debug("Dongle requested %d bytes", bytes_requested) 

1482 

1483 # All is good 

1484 return (True, response)