Coverage for ledger/hsm2dongle.py: 89%

692 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-05 20:41 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import struct 

24from enum import IntEnum, Enum, auto 

25from ledgerblue.comm import getDongle 

26from ledgerblue.commException import CommException 

27import hid 

28from .signature import HSM2DongleSignature 

29from .version import HSM2FirmwareVersion 

30from .parameters import HSM2FirmwareParameters 

31from .hsm2dongle_cmds import HSM2SignerHeartbeat, HSM2UIHeartbeat 

32from .block_utils import ( 

33 rlp_mm_payload_size, 

34 remove_mm_fields_if_present, 

35 get_coinbase_txn, 

36 get_block_hash, 

37) 

38from comm.bitcoin import encode_varint 

39from comm.pow import coinbase_tx_get_hash 

40import logging 

41 

42# Enumerations 

43 

44# Dongle commands 

45 

46 

47class _Command(IntEnum): 

48 IS_ONBOARD = 0x06 

49 ECHO = 0x02 # UI command 

50 SIGN = 0x02 # Signer command 

51 GET_PUBLIC_KEY = 0x04 

52 SEND_PIN = 0x41 

53 UNLOCK = 0xFE 

54 CHANGE_PIN = 0x08 

55 GET_MODE = 0x43 

56 EXIT_MENU = 0xFF 

57 EXIT_MENU_NO_AUTOEXEC = 0xFA 

58 GET_STATE = 0x20 

59 RESET_AB = 0x21 

60 ADVANCE = 0x10 

61 UPD_ANCESTOR = 0x30 

62 GET_PARAMETERS = 0x11 

63 SEED = 0x44 

64 WIPE = 0x07 

65 UI_ATT = 0x50 

66 SIGNER_ATT = 0x50 

67 SIGNER_AUTH = 0x51 

68 RETRIES = 0x45 

69 

70 

71# Sign command OPs 

72class _SignOps(IntEnum): 

73 PATH = 0x01 

74 BTC_TX = 0x02 

75 TX_RECEIPT = 0x04 

76 MERKLE_PROOF = 0x08 

77 SUCCESS = 0x81 

78 

79 

80# Get blockchain state command OPs 

81class _GetStateOps(IntEnum): 

82 HASH = 0x01 

83 DIFF = 0x02 

84 FLAGS = 0x03 

85 

86 

87# Reset advance blockchain command OPs 

88class _ResetAdvanceOps(IntEnum): 

89 INIT = 0x01 

90 DONE = 0x02 

91 

92 

93# Advance blockchain command OPs 

94class _AdvanceOps(IntEnum): 

95 INIT = 0x02 

96 HEADER_META = 0x03 

97 HEADER_CHUNK = 0x04 

98 PARTIAL = 0x05 

99 SUCCESS = 0x06 

100 BROTHER_LIST_META = 0x07 

101 BROTHER_META = 0x08 

102 BROTHER_CHUNK = 0x09 

103 

104 

105# Update ancestor command OPs 

106class _UpdateAncestorOps(IntEnum): 

107 INIT = 0x02 

108 HEADER_META = 0x03 

109 HEADER_CHUNK = 0x04 

110 SUCCESS = 0x05 

111 

112 

113# UI attestation OPs 

114class _UIAttestationOps(IntEnum): 

115 OP_UD_VALUE = 0x01 

116 OP_GET_MSG = 0x02 

117 OP_GET = 0x03 

118 OP_APP_HASH = 0x04 

119 

120 

121# Signer attestation OPs 

122class _SignerAttestationOps(IntEnum): 

123 OP_GET = 0x01 

124 OP_GET_MESSAGE = 0x02 

125 OP_APP_HASH = 0x03 

126 

127 

128# Signer authorization OPs (and results) 

129class _SignerAuthorizationOps(IntEnum): 

130 OP_SIGVER = 0x01 

131 OP_SIGN = 0x02 

132 OP_SIGN_RES_MORE = 0x01 

133 OP_SIGN_RES_SUCCESS = 0x02 

134 

135 

136# Command Ops 

137class _Ops: 

138 SIGN = _SignOps 

139 GST = _GetStateOps 

140 RAV = _ResetAdvanceOps 

141 ADVANCE = _AdvanceOps 

142 UPD_ANCESTOR = _UpdateAncestorOps 

143 UI_ATT = _UIAttestationOps 

144 SIGNER_ATT = _SignerAttestationOps 

145 SIGNER_AUTH = _SignerAuthorizationOps 

146 

147 

148# Protocol offsets 

149class _Offset(IntEnum): 

150 CLA = 0 

151 CMD = 1 

152 OP = 2 

153 DATA = 3 

154 

155 

156# Device modes 

157class _Mode(IntEnum): 

158 BOOTLOADER = 0x02 

159 SIGNER = 0x03 

160 UI_HEARTBEAT = 0x04 

161 UNKNOWN = 0xFF 

162 

163 

164# Get blockchain state flag indexes 

165class _GetStateFlagOffset(IntEnum): 

166 IN_PROGRESS = 0 

167 ALREADY_VALIDATED = 1 

168 FOUND_BEST_BLOCK = 2 

169 

170 

171# Get blockchain state constants 

172class _GetState: 

173 FLAG_OFFSET = _GetStateFlagOffset 

174 HASH_VALUES = { 

175 "best_block": 0x01, 

176 "newest_valid_block": 0x02, 

177 "ancestor_block": 0x03, 

178 "ancestor_receipts_root": 0x05, 

179 "updating.best_block": 0x81, 

180 "updating.newest_valid_block": 0x82, 

181 "updating.next_expected_block": 0x84, 

182 } 

183 

184 

185# Sign command errors 

186class _SignError(IntEnum): 

187 DATA_SIZE = 0x6A87 

188 INPUT = auto() 

189 STATE = auto() 

190 RLP = auto() 

191 RLP_INT = auto() 

192 RLP_DEPTH = auto() 

193 TX_HASH_MISMATCH = auto() 

194 TX_VERSION = auto() 

195 INVALID_PATH = auto() 

196 DATA_SIZE_AUTH = auto() 

197 DATA_SIZE_NOAUTH = auto() 

198 NODE_VERSION = auto() 

199 SHARED_PREFIX_TOO_BIG = auto() 

200 RECEIPT_HASH_MISMATCH = auto() 

201 NODE_CHAINING_MISMATCH = auto() 

202 RECEIPT_ROOT_MISMATCH = auto() 

203 INVALID_SIGHASH_COMPUTATION_MODE = auto() 

204 INVALID_EXTRADATA_SIZE = auto() 

205 

206 

207# Get public key command errors 

208class _GetPubKeyError(IntEnum): 

209 DATA_SIZE = 0x6A87 

210 

211 

212# Advance blockchain and update ancestor command errors 

213class _AdvanceUpdateError(IntEnum): 

214 UNKNOWN = 0 

215 PROT_INVALID = 0x6B87 

216 RLP_INVALID = auto() 

217 BLOCK_TOO_OLD = auto() 

218 BLOCK_TOO_SHORT = auto() 

219 PARENT_HASH_INVALID = auto() 

220 RECEIPT_ROOT_INVALID = auto() 

221 BLOCK_NUM_INVALID = auto() 

222 BLOCK_DIFF_INVALID = auto() 

223 UMM_ROOT_INVALID = auto() 

224 BTC_HEADER_INVALID = auto() 

225 MERKLE_PROOF_INVALID = auto() 

226 BTC_CB_TXN_INVALID = auto() 

227 MM_RLP_LEN_MISMATCH = auto() 

228 BTC_DIFF_MISMATCH = auto() 

229 MERKLE_PROOF_MISMATCH = auto() 

230 MM_HASH_MISMATCH = auto() 

231 MERKLE_PROOF_OVERFLOW = auto() 

232 CB_TXN_OVERFLOW = auto() 

233 BUFFER_OVERFLOW = auto() 

234 CHAIN_MISMATCH = auto() 

235 TOTAL_DIFF_OVERFLOW = auto() 

236 ANCESTOR_TIP_MISMATCH = auto() 

237 CB_TXN_HASH_MISMATCH = auto() 

238 BROTHERS_TOO_MANY = auto() 

239 BROTHER_PARENT_MISMATCH = auto() 

240 BROTHER_SAME_AS_BLOCK = auto() 

241 BROTHER_ORDER_INVALID = auto() 

242 

243 

244class _UIError(IntEnum): 

245 INVALID_PIN = 0x69A0 

246 

247 

248class _UIAttestationError(IntEnum): 

249 PROT_INVALID = 0x6A01 

250 NO_ONBOARD = 0x6A02 

251 INTERNAL = 0x6A99 

252 

253 

254class _SignerAttestationError(IntEnum): 

255 PROT_INVALID = 0x6B00 

256 INTERNAL = 0x6B01 

257 

258 

259class _SignerAuthorizationError(IntEnum): 

260 PROT_INVALID = 0x6A01 

261 INVALID_ITERATION = 0x6a03 

262 INVALID_SIGNATURE = 0x6a04 

263 INVALID_AUTH_INVALID_INDEX = 0x6a05 

264 

265 

266# Error codes 

267class _Error: 

268 SIGN = _SignError 

269 GETPUBKEY = _GetPubKeyError 

270 ADVANCE = _AdvanceUpdateError 

271 UPD_ANCESTOR = _AdvanceUpdateError 

272 UI = _UIError 

273 UI_ATT = _UIAttestationError 

274 SIGNER_ATT = _SignerAttestationError 

275 SIGNER_AUTH = _SignerAuthorizationError 

276 

277 # Whether a given code is in the 

278 # user-defined (RSK firmware) specific error code range 

279 @staticmethod 

280 def is_user_defined_error(code): 

281 return code >= 0x69A0 and code <= 0x6BFF 

282 

283 

284# Sign command responses to the user 

285class _SignResponse(IntEnum): 

286 ERROR_PATH = -1 

287 ERROR_BTC_TX = -2 

288 ERROR_TX_RECEIPT = -3 

289 ERROR_MERKLE_PROOF = -4 

290 ERROR_HASH = -5 

291 ERROR_UNEXPECTED = -10 

292 

293 

294# Advance blockchain responses to the user 

295class _AdvanceResponse(IntEnum): 

296 OK_TOTAL = 1 

297 OK_PARTIAL = 2 

298 

299 ERROR_INIT = -1 

300 ERROR_COMPUTE_METADATA = -2 

301 ERROR_METADATA = -3 

302 ERROR_BLOCK_DATA = -4 

303 ERROR_INVALID_BLOCK = -5 

304 ERROR_POW_INVALID = -6 

305 ERROR_CHAINING_MISMATCH = -7 

306 ERROR_UNSUPPORTED_CHAIN = -8 

307 ERROR_INVALID_BROTHERS = -9 

308 ERROR_UNEXPECTED = -10 

309 

310 

311# Update ancestor responses to the user 

312class _UpdateAncestorResponse(IntEnum): 

313 OK_TOTAL = 1 

314 

315 ERROR_INIT = -1 

316 ERROR_COMPUTE_METADATA = -2 

317 ERROR_METADATA = -3 

318 ERROR_BLOCK_DATA = -4 

319 ERROR_INVALID_BLOCK = -5 

320 ERROR_CHAINING_MISMATCH = -6 

321 ERROR_TIP_MISMATCH = -7 

322 ERROR_REMOVE_MM_FIELDS = -8 

323 ERROR_UNEXPECTED = -10 

324 

325 

326# Responses 

327class _Response: 

328 SIGN = _SignResponse 

329 ADVANCE = _AdvanceResponse 

330 UPD_ANCESTOR = _UpdateAncestorResponse 

331 

332 

333# Onboarding constants 

334class _Onboarding(IntEnum): 

335 SEED_LENGTH = 32 

336 TIMEOUT = 10 

337 

338 

339# Sighash computation modes 

340class SighashComputationMode(Enum): 

341 def __new__(cls, *args, **kwds): 

342 obj = object.__new__(cls) 

343 obj._value_ = args[0] 

344 obj.netvalue = args[1] 

345 return obj 

346 

347 LEGACY = "legacy", 0 

348 SEGWIT = "segwit", 1 

349 

350 

351class HSM2DongleBaseError(RuntimeError): 

352 @property 

353 def message(self): 

354 if len(self.args) == 0: 

355 return None 

356 return self.args[0] 

357 

358 

359class HSM2DongleError(HSM2DongleBaseError): 

360 pass 

361 

362 

363class HSM2DongleTimeoutError(HSM2DongleBaseError): 

364 @staticmethod 

365 def is_timeout(exc): 

366 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout": 

367 return True 

368 return False 

369 

370 

371class HSM2DongleCommError(HSM2DongleBaseError): 

372 @staticmethod 

373 def is_comm_error(exc): 

374 if ( 

375 type(exc) == BaseException 

376 and len(exc.args) == 1 

377 and exc.args[0] == "Error while writing" 

378 ) or ( 

379 type(exc) == OSError 

380 and len(exc.args) == 1 

381 and exc.args[0] == "read error" 

382 ) or isinstance(exc, HSM2DongleCommError): 

383 return True 

384 return False 

385 

386 

387class HSM2DongleErrorResult(HSM2DongleBaseError): 

388 @property 

389 def error_code(self): 

390 return self.args[0] 

391 

392 def __str__(self): 

393 return f"Dongle returned error code {hex(self.error_code)}" 

394 

395 

396# Handles low-level communication with a powHSM dongle 

397class HSM2Dongle: 

398 # Ledger constants 

399 HASH_SIZE = 32 

400 

401 # APDU prefix 

402 CLA = 0x80 

403 

404 # Enumeration shorthands 

405 OFF = _Offset 

406 CMD = _Command 

407 MODE = _Mode 

408 OP = _Ops 

409 ERR = _Error 

410 GST = _GetState 

411 RESPONSE = _Response 

412 ONBOARDING = _Onboarding 

413 

414 # Dongle exchange timeout 

415 DONGLE_TIMEOUT = 10 # seconds 

416 

417 # Maximum pages expected to conform the UI attestation message 

418 MAX_PAGES_UI_ATT_MESSAGE = 4 

419 

420 # Size of the iteration parameter for the signer authorization 

421 SIGNER_AUTH_ITERATION_SIZE = 2 

422 

423 # Shorthand for externally defined commands 

424 ErrorResult = HSM2DongleErrorResult 

425 

426 def __init__(self, debug): 

427 self.logger = logging.getLogger("dongle") 

428 self.debug = debug 

429 self.last_comm_exception = None 

430 

431 # Send command to device 

432 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT): 

433 self.last_comm_exception = None 

434 try: 

435 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data) 

436 self.logger.debug("Sending command: 0x%s", cmd.hex()) 

437 result = self.dongle.exchange(cmd, timeout=timeout) 

438 self.logger.debug("Received: 0x%s", result.hex()) 

439 except (CommException, BaseException) as e: 

440 # If this is a user-defined error, raise an 

441 # error result error 

442 if type(e) == CommException: 

443 self.last_comm_exception = e 

444 error_code = e.sw 

445 if _Error.is_user_defined_error(error_code): 

446 self.logger.error("Received error code: %s", hex(error_code)) 

447 raise HSM2DongleErrorResult(error_code) 

448 

449 # If this is a dongle timeout, raise a timeout error 

450 if HSM2DongleTimeoutError.is_timeout(e): 

451 raise HSM2DongleTimeoutError(str(e)) 

452 

453 # If this is a dongle communication problem, raise a comm error 

454 if HSM2DongleCommError.is_comm_error(e): 

455 raise HSM2DongleCommError(str(e)) 

456 

457 # Raise a standard error, but 

458 # report differently for a CommException and any other 

459 # type of exception 

460 if type(e) == CommException: 

461 msg = "Error sending command: %s" % str(e) 

462 self.logger.error(msg) 

463 else: 

464 msg = "Unknown error sending command: %s (of type %s)" % \ 

465 (str(e), type(e).__name__) 

466 self.logger.critical(msg) 

467 

468 raise HSM2DongleError(msg) 

469 

470 return result 

471 

472 # Send command version to be used by command classes 

473 def send_command(self, cmd, op, data, timeout=DONGLE_TIMEOUT): 

474 return self._send_command(cmd, bytes([op]) + data, timeout) 

475 

476 # Connect to the dongle 

477 def connect(self): 

478 try: 

479 self.logger.info("Connecting") 

480 self.dongle = getDongle(self.debug) 

481 self.logger.info("Connected") 

482 except CommException as e: 

483 msg = "Error connecting: %s" % e.message 

484 self.logger.error(msg) 

485 raise HSM2DongleCommError(msg) 

486 

487 # Disconnect from dongle 

488 def disconnect(self): 

489 try: 

490 self.logger.info("Disconnecting") 

491 if self.dongle and self.dongle.opened: 

492 self.dongle.close() 

493 # **** Begin hack **** 

494 # When running within a docker container, 

495 # the hidapi library fails to detect a physical 

496 # usb device reconnection. This will "hard reset" the 

497 # stack so that a potential physical device reconnection 

498 # can be detected. 

499 try: 

500 hid.hidapi_exit() 

501 except Exception: 

502 # hidapi_exit() can sometimes throw. we don't care 

503 pass 

504 # **** End hack **** 

505 self.logger.info("Disconnected") 

506 except CommException as e: 

507 msg = "Error disconnecting: %s" % e.message 

508 self.logger.error(msg) 

509 raise HSM2DongleCommError(msg) 

510 

511 # Return device mode 

512 def get_current_mode(self): 

513 try: 

514 apdu_rcv = self._send_command(self.CMD.GET_MODE) 

515 return self.MODE(apdu_rcv[1]) 

516 except HSM2DongleError: 

517 return self.MODE.UNKNOWN 

518 

519 # Echo message 

520 def echo(self): 

521 message = bytes([0x41, 0x42, 0x43]) 

522 result = bytes(self._send_command(self.CMD.ECHO, message)) 

523 # Result should be the command plus the message 

524 expected_result = bytes([self.CLA, self.CMD.ECHO]) + message 

525 return result == expected_result 

526 

527 # Return true if the hsm2 is onboarded 

528 def is_onboarded(self): 

529 self.logger.info("Sending isOnboarded") 

530 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD) 

531 is_onboard = apdu_rcv[1] == 1 

532 self.logger.info("isOnboarded: %s", "yes" if is_onboard else "no") 

533 return is_onboard 

534 

535 # Attempt to onboard the device using the given seed and pin 

536 # Return the generated backup 

537 def onboard(self, seed, pin): 

538 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH: 

539 raise HSM2DongleError("Invalid seed given") 

540 

541 self.logger.info("Sending seed") 

542 for i, b in enumerate(seed): 

543 self._send_command(self.CMD.SEED, bytes([i, b])) 

544 

545 self.logger.info("Sending pin") 

546 self._send_pin(pin, True) 

547 

548 self.logger.info("Sending wipe") 

549 apdu_rcv = self._send_command(self.CMD.WIPE, timeout=self.ONBOARDING.TIMEOUT) 

550 

551 if apdu_rcv[1] != 2: 

552 raise HSM2DongleError("Error onboarding. Got '%s'" % apdu_rcv.hex()) 

553 

554 return True 

555 

556 # send PIN to device, optionally prepending its length 

557 def _send_pin(self, pin, prepend_length): 

558 final_pin = pin 

559 if prepend_length: 

560 final_pin = bytes([len(pin)]) + final_pin 

561 

562 for i in range(len(final_pin)): 

563 self._send_command(self.CMD.SEND_PIN, bytes([i, final_pin[i]])) 

564 

565 # unlock the device with the PIN sent 

566 def unlock(self, pin): 

567 # Send the pin, then send the unlock command per se 

568 self._send_pin(pin, prepend_length=False) 

569 apdu_rcv = self._send_command(self.CMD.UNLOCK, bytes([0x00, 0x00])) 

570 

571 # Zero indicates wrong pin. Nonzero indicates device unlocked 

572 return apdu_rcv[2] != 0 

573 

574 # replace PIN with a new one 

575 def new_pin(self, pin): 

576 try: 

577 # Send the pin, then send the replace command per se 

578 self._send_pin(pin, prepend_length=True) 

579 self._send_command(self.CMD.CHANGE_PIN) 

580 # All is good 

581 return True 

582 except HSM2DongleErrorResult as e: 

583 # Tried to set an invalid pin 

584 if e.error_code == self.ERR.UI.INVALID_PIN: 

585 return False 

586 # Something else happened 

587 raise e 

588 

589 # returns an instance of HSM2FirmwareVersion representing 

590 # the version of the currently running firmware on the HSM2 

591 # that is connected (i.e., could be either the signer or ui) 

592 def get_version(self): 

593 apdu_rcv = self._send_command(self.CMD.IS_ONBOARD) 

594 return HSM2FirmwareVersion(apdu_rcv[2], apdu_rcv[3], apdu_rcv[4]) 

595 

596 # returns the number of pin retries available 

597 def get_retries(self): 

598 apdu_rcv = self._send_command(self.CMD.RETRIES) 

599 return apdu_rcv[2] 

600 

601 # returns an instance of HSM2FirmwareParameters representing 

602 # the parameters of the currently running firmware on the HSM2 

603 # that is connected (it should be running the signer). 

604 def get_signer_parameters(self): 

605 try: 

606 apdu_rcv = self._send_command(self.CMD.GET_PARAMETERS) 

607 return HSM2FirmwareParameters.from_dongle_format(apdu_rcv[self.OFF.DATA:]) 

608 except ValueError as e: 

609 msg = "While getting signer firmware parameters: %s" % str(e) 

610 self.logger.error(msg) 

611 raise HSM2DongleError(msg) 

612 

613 # exit the ledger nano S menu 

614 def exit_menu(self, autoexec=True): 

615 self._send_command( 

616 self.CMD.EXIT_MENU if autoexec else self.CMD.EXIT_MENU_NO_AUTOEXEC, 

617 bytes([0x00, 0x00]), 

618 ) 

619 

620 # exit the current app 

621 # could either be the UI bootloader, UI heartbeat or Signer 

622 def exit_app(self): 

623 self._send_command(self.CMD.EXIT_MENU) 

624 

625 # get the public key for a bip32 path 

626 # key_id: BIP32Path 

627 def get_public_key(self, key_id): 

628 publicKey = self._send_command(self.CMD.GET_PUBLIC_KEY, key_id.to_binary()) 

629 return publicKey.hex() 

630 

631 # Ask the device to sign a specific input of a given unsigned bitcoin transaction 

632 # using the given RSK transaction receipt as an authorization for the signature. 

633 # key_id: BIP32Path 

634 # rsk_tx_receipt: hex string 

635 # btc_tx: hex string 

636 # receipt_merkle_proof: list 

637 # input_index: int 

638 # sighash_computation_mode: a SighashComputationMode instance 

639 # witness_script: hex string 

640 # outpoint_value: int 

641 def sign_authorized( 

642 self, key_id, rsk_tx_receipt, receipt_merkle_proof, btc_tx, input_index, 

643 sighash_computation_mode, witness_script, outpoint_value 

644 ): 

645 # *** Signing protocol *** 

646 # The order in which things are required and then sent is: 

647 # 1. BIP32 path & BTC tx input index (single message) 

648 # 2. BTC transaction (several messages, as required by ledger) 

649 # 3. RSK transaction receipt (several messages, as required by ledger) 

650 # 4. RSK Tx receipt merkle proof (several messages, as required by ledger) 

651 # (Note: in theory, one could depend only on the order in which 

652 # the ledger requires data and send it blindly. In practice, 

653 # we know exactly the order in which the device will require 

654 # the data, and use that order to validate it as we go.) 

655 # 

656 # During these exchanges, an exception can be raised at any moment, which 

657 # would signal failure signing. 

658 # Specific error codes come with HSM2DongleErrorResult 

659 # exception instances and are handled accordingly. Anything else 

660 # is treated as an unexpected error and is let for the calling layer 

661 # to handle. 

662 

663 # Step 1. Send path and input index 

664 key_id_bytes = key_id.to_binary() 

665 input_index_bytes = input_index.to_bytes(4, byteorder="little", signed=False) 

666 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + input_index_bytes 

667 try: 

668 self.logger.debug("Sign: sending path - %s", data.hex()) 

669 response = self._send_command(self.CMD.SIGN, data) 

670 

671 # We expect the device to ask for the BTC tx next. 

672 # If this doesn't happen, error out 

673 if response[self.OFF.OP] != self.OP.SIGN.BTC_TX: 

674 self.logger.error("Sign: unexpected response %s", response.hex()) 

675 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

676 

677 # How many bytes to send in the next message 

678 bytes_requested = response[self.OFF.DATA] 

679 except HSM2DongleErrorResult as e: 

680 self.logger.error("Sign returned: %s", hex(e.error_code)) 

681 if e.error_code in [ 

682 self.ERR.SIGN.DATA_SIZE, 

683 self.ERR.SIGN.DATA_SIZE_AUTH, 

684 self.ERR.SIGN.DATA_SIZE_NOAUTH, 

685 ]: 

686 return (False, self.RESPONSE.SIGN.ERROR_PATH) 

687 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

688 

689 # Step 2. Send BTC transaction and extra data 

690 # Prefix the BTC transaction with the total length of the payload encoded as a 

691 # 4 bytes little endian unsigned integer. The total length should include 

692 # those 4 bytes plus 2 bytes for the length of the extradata and 1 byte for 

693 # the sighash computation mode 

694 try: 

695 PAYLOADLENGTH_LENGTH = 4 

696 SIGHASH_COMPUTATION_MODE_LENGTH = 1 

697 EXTRADATALENGTH_LENGTH = 2 

698 OUTPOINT_VALUE_LENGTH = 8 

699 

700 btc_tx_bytes = bytes.fromhex(btc_tx) 

701 

702 scm_bytes = sighash_computation_mode.netvalue.to_bytes( 

703 SIGHASH_COMPUTATION_MODE_LENGTH, 

704 byteorder='little', signed=False 

705 ) 

706 

707 ed_bytes = b"" 

708 

709 if sighash_computation_mode == SighashComputationMode.SEGWIT: 

710 ov_bytes = outpoint_value.to_bytes( 

711 OUTPOINT_VALUE_LENGTH, 

712 byteorder='little', signed=False 

713 ) 

714 

715 ws_bytes = bytes.fromhex(witness_script) 

716 ws_length_bytes = bytes.fromhex(encode_varint(len(ws_bytes))) 

717 

718 ed_bytes = ws_length_bytes + ws_bytes + ov_bytes 

719 

720 edl_bytes = len(ed_bytes).to_bytes( 

721 EXTRADATALENGTH_LENGTH, 

722 byteorder='little', signed=False 

723 ) 

724 

725 payload_length = \ 

726 PAYLOADLENGTH_LENGTH + \ 

727 SIGHASH_COMPUTATION_MODE_LENGTH + \ 

728 EXTRADATALENGTH_LENGTH + \ 

729 len(btc_tx_bytes) 

730 

731 payload_length_bytes = payload_length.to_bytes( 

732 PAYLOADLENGTH_LENGTH, byteorder="little", signed=False 

733 ) 

734 

735 data = payload_length_bytes + scm_bytes + edl_bytes + btc_tx_bytes + ed_bytes 

736 

737 response = self._send_data_in_chunks( 

738 command=self.CMD.SIGN, 

739 operation=self.OP.SIGN.BTC_TX, 

740 next_operations=[self.OP.SIGN.TX_RECEIPT], 

741 data=data, 

742 expect_full_data=True, 

743 initial_bytes=bytes_requested, 

744 operation_name="sign", 

745 data_description="BTC tx", 

746 ) 

747 

748 if not response[0]: 

749 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

750 

751 bytes_requested = response[1][self.OFF.DATA] 

752 except HSM2DongleErrorResult as e: 

753 self.logger.error("Sign returned: %s", hex(e.error_code)) 

754 if e.error_code in [ 

755 self.ERR.SIGN.INPUT, 

756 self.ERR.SIGN.DATA_SIZE, 

757 self.ERR.SIGN.TX_HASH_MISMATCH, 

758 self.ERR.SIGN.TX_VERSION, 

759 self.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE, 

760 self.ERR.SIGN.INVALID_EXTRADATA_SIZE, 

761 ]: 

762 return (False, self.RESPONSE.SIGN.ERROR_BTC_TX) 

763 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

764 

765 # Step 3. Send transaction receipt 

766 try: 

767 response = self._send_data_in_chunks( 

768 command=self.CMD.SIGN, 

769 operation=self.OP.SIGN.TX_RECEIPT, 

770 next_operations=[self.OP.SIGN.MERKLE_PROOF], 

771 data=bytes.fromhex(rsk_tx_receipt), 

772 expect_full_data=True, 

773 initial_bytes=bytes_requested, 

774 operation_name="sign", 

775 data_description="tx receipt", 

776 ) 

777 

778 if not response[0]: 

779 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

780 

781 bytes_requested = response[1][self.OFF.DATA] 

782 except HSM2DongleErrorResult as e: 

783 self.logger.error("Sign returned: %s", hex(e.error_code)) 

784 if e.error_code in [ 

785 self.ERR.SIGN.STATE, 

786 self.ERR.SIGN.RLP, 

787 self.ERR.SIGN.RLP_INT, 

788 self.ERR.SIGN.RLP_DEPTH, 

789 self.ERR.SIGN.DATA_SIZE, 

790 ]: 

791 return (False, self.RESPONSE.SIGN.ERROR_TX_RECEIPT) 

792 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

793 

794 # Step 4. Send tx receipt merkle proof 

795 # The format for the receipts merkle proof is as follows: 

796 # 1 byte for the number of nodes 

797 # For each node: 1 byte for the node length + the node bytes. 

798 try: 

799 if len(receipt_merkle_proof) > 255: 

800 raise ValueError("Too many nodes") 

801 

802 merkle_proof_bytes = bytes([len(receipt_merkle_proof)]) 

803 for node in receipt_merkle_proof: 

804 node_bytes = bytes.fromhex(node) 

805 if len(node_bytes) > 255: 

806 raise ValueError("Node too big: %s" % node) 

807 merkle_proof_bytes = ( 

808 merkle_proof_bytes + bytes([len(node_bytes)]) + node_bytes 

809 ) 

810 except ValueError as e: 

811 self.logger.error("Sign: invalid receipts merkle proof: %s", str(e)) 

812 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF) 

813 

814 try: 

815 response = self._send_data_in_chunks( 

816 command=self.CMD.SIGN, 

817 operation=self.OP.SIGN.MERKLE_PROOF, 

818 next_operations=[self.OP.SIGN.SUCCESS], 

819 data=merkle_proof_bytes, 

820 expect_full_data=True, 

821 initial_bytes=bytes_requested, 

822 operation_name="sign", 

823 data_description="receipts merkle proof", 

824 ) 

825 

826 if not response[0]: 

827 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

828 except HSM2DongleErrorResult as e: 

829 self.logger.error("Sign returned: %s", hex(e.error_code)) 

830 if e.error_code in [ 

831 self.ERR.SIGN.DATA_SIZE, 

832 self.ERR.SIGN.STATE, 

833 self.ERR.SIGN.NODE_VERSION, 

834 self.ERR.SIGN.SHARED_PREFIX_TOO_BIG, 

835 self.ERR.SIGN.RECEIPT_HASH_MISMATCH, 

836 self.ERR.SIGN.NODE_CHAINING_MISMATCH, 

837 self.ERR.SIGN.RECEIPT_ROOT_MISMATCH, 

838 ]: 

839 return (False, self.RESPONSE.SIGN.ERROR_MERKLE_PROOF) 

840 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

841 

842 # If we get here, we should have a signature in the data part. 

843 # Return success along with it. 

844 try: 

845 return (True, HSM2DongleSignature(response[1][self.OFF.DATA:])) 

846 except Exception as e: 

847 self.logger.error("Error parsing signature: %s", str(e)) 

848 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

849 

850 # Ask the device to sign a specific hash without any authorization. 

851 # key_id: BIP32Path 

852 # hash: hex string 

853 def sign_unauthorized(self, key_id, hash): 

854 # *** Signing protocol *** 

855 # This signing method requires a single message that contains the 

856 # BIP32 path & hash to sign. 

857 # 

858 # An exception can be raised, which 

859 # would signal failure signing. Specific error codes 

860 # come with HSM2DongleErrorResult 

861 # exception instances and are handled accordingly. Anything else 

862 # is treated as an unexpected error and is let for the calling layer 

863 # to handle. 

864 

865 try: 

866 hash_bytes = bytes.fromhex(hash) 

867 except ValueError: 

868 self.logger.error("Sign: invalid hash - %s", hash) 

869 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

870 

871 # Send path and hash to sign 

872 try: 

873 key_id_bytes = key_id.to_binary() 

874 data = bytes([self.OP.SIGN.PATH]) + key_id_bytes + hash_bytes 

875 self.logger.debug("Sign: sending path and hash - %s", data.hex()) 

876 response = self._send_command(self.CMD.SIGN, data) 

877 

878 # Special case: if the device asks for a BTC transaction, then 

879 # there's a case of both invalid path and invalid hash. Report invalid hash 

880 if response[self.OFF.OP] == self.OP.SIGN.BTC_TX: 

881 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

882 

883 # We expect the device to report success signing 

884 # If this doesn't happen, error out 

885 if response[self.OFF.OP] != self.OP.SIGN.SUCCESS: 

886 self.logger.error("Sign: unexpected response %s", response.hex()) 

887 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

888 except HSM2DongleErrorResult as e: 

889 self.logger.error("Sign returned: %s", hex(e.error_code)) 

890 if e.error_code in [self.ERR.SIGN.DATA_SIZE, self.ERR.SIGN.DATA_SIZE_NOAUTH]: 

891 return (False, self.RESPONSE.SIGN.ERROR_HASH) 

892 elif e.error_code in [ 

893 self.ERR.SIGN.INVALID_PATH, 

894 self.ERR.SIGN.DATA_SIZE_AUTH, 

895 ]: 

896 return (False, self.RESPONSE.SIGN.ERROR_PATH) 

897 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

898 

899 # If we get here, we should have a signature in the data part. 

900 # Return success along with it. 

901 try: 

902 return (True, HSM2DongleSignature(response[self.OFF.DATA:])) 

903 except Exception as e: 

904 self.logger.error("Error parsing signature: %s", str(e)) 

905 return (False, self.RESPONSE.SIGN.ERROR_UNEXPECTED) 

906 

907 def get_blockchain_state(self): 

908 state = {} 

909 

910 # Get hashes 

911 for (key, hash_cmd) in self.GST.HASH_VALUES.items(): 

912 self.logger.info("Getting hash value for '%s'", key) 

913 result = self._send_command( 

914 self.CMD.GET_STATE, bytes([self.OP.GST.HASH, hash_cmd]) 

915 ) 

916 

917 # Validate result 

918 if ( 

919 result[self.OFF.OP] != self.OP.GST.HASH 

920 or result[self.OFF.DATA] != hash_cmd 

921 or len(result[self.OFF.DATA + 1:]) != self.HASH_SIZE 

922 ): 

923 msg = "Invalid response for hash: %s" % result.hex() 

924 self.logger.error(msg) 

925 raise HSM2DongleError(msg) 

926 

927 state[key] = result[self.OFF.DATA + 1:].hex() 

928 

929 # Get difficulty 

930 self.logger.info("Getting difficulty") 

931 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.DIFF])) 

932 if result[self.OFF.OP] != self.OP.GST.DIFF: 

933 msg = "Invalid response for difficulty: %s" % result.hex() 

934 self.logger.error(msg) 

935 raise HSM2DongleError(msg) 

936 

937 state["updating.total_difficulty"] = int.from_bytes( 

938 result[self.OFF.DATA:], byteorder="big", signed=False 

939 ) 

940 

941 # Get flags 

942 self.logger.info("Getting flags") 

943 result = self._send_command(self.CMD.GET_STATE, bytes([self.OP.GST.FLAGS])) 

944 if result[self.OFF.OP] != self.OP.GST.FLAGS or len(result[self.OFF.DATA:]) != 3: 

945 msg = "Invalid response for flags: %s" % result.hex() 

946 self.logger.error(msg) 

947 raise HSM2DongleError(msg) 

948 

949 state["updating.in_progress"] = bool( 

950 result[self.OFF.DATA + self.GST.FLAG_OFFSET.IN_PROGRESS] 

951 ) 

952 state["updating.already_validated"] = bool( 

953 result[self.OFF.DATA + self.GST.FLAG_OFFSET.ALREADY_VALIDATED] 

954 ) 

955 state["updating.found_best_block"] = bool( 

956 result[self.OFF.DATA + self.GST.FLAG_OFFSET.FOUND_BEST_BLOCK] 

957 ) 

958 

959 return state 

960 

961 def reset_advance_blockchain(self): 

962 self.logger.info("Resetting advance blockchain") 

963 result = self._send_command(self.CMD.RESET_AB, bytes([self.OP.RAV.INIT])) 

964 if result[self.OFF.OP] != self.OP.RAV.DONE: 

965 msg = "Invalid response for reset advance blockchain: %s" % result.hex() 

966 self.logger.error(msg) 

967 raise HSM2DongleError(msg) 

968 

969 return True 

970 

971 # Ask the device to update its blockchain references by processing 

972 # a given set of blocks and their brothers. 

973 # blocks: list of hex strings 

974 # (each hex string is a raw block header, 

975 # which should *always* include merge mining fields) 

976 # brothers: list of list of hex strings 

977 # (each list of hex strings is the block's brothers' headers 

978 # for the corresponding block header in the same position 

979 # of the blocks list) 

980 def advance_blockchain(self, blocks, brothers): 

981 # Convenient shorthands 

982 err = self.ERR.ADVANCE 

983 response = self.RESPONSE.ADVANCE 

984 

985 # Sort each group of brothers by block hash 

986 brothers = list(map(lambda brolist: 

987 sorted(brolist, 

988 key=lambda bh: bytes.fromhex(get_block_hash(bh)) 

989 ), 

990 brothers) 

991 ) 

992 

993 return self._do_block_operation( 

994 "advance", 

995 blocks, 

996 brothers, 

997 self.CMD.ADVANCE, 

998 self.OP.ADVANCE, 

999 err, 

1000 response, 

1001 { 

1002 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK, 

1003 err.MERKLE_PROOF_OVERFLOW: response.ERROR_INVALID_BLOCK, 

1004 err.CB_TXN_OVERFLOW: response.ERROR_INVALID_BLOCK, 

1005 err.RLP_INVALID: response.ERROR_INVALID_BLOCK, 

1006 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK, 

1007 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK, 

1008 err.UMM_ROOT_INVALID: response.ERROR_INVALID_BLOCK, 

1009 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK, 

1010 err.MERKLE_PROOF_INVALID: response.ERROR_INVALID_BLOCK, 

1011 err.BLOCK_DIFF_INVALID: response.ERROR_INVALID_BLOCK, 

1012 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK, 

1013 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK, 

1014 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK, 

1015 err.MERKLE_PROOF_MISMATCH: response.ERROR_POW_INVALID, 

1016 err.BTC_CB_TXN_INVALID: response.ERROR_POW_INVALID, 

1017 err.MM_HASH_MISMATCH: response.ERROR_POW_INVALID, 

1018 err.BTC_DIFF_MISMATCH: response.ERROR_POW_INVALID, 

1019 err.CB_TXN_HASH_MISMATCH: response.ERROR_POW_INVALID, 

1020 err.BROTHERS_TOO_MANY: response.ERROR_INVALID_BROTHERS, 

1021 err.BROTHER_PARENT_MISMATCH: response.ERROR_INVALID_BROTHERS, 

1022 err.BROTHER_SAME_AS_BLOCK: response.ERROR_INVALID_BROTHERS, 

1023 err.BROTHER_ORDER_INVALID: response.ERROR_INVALID_BROTHERS, 

1024 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH, 

1025 err.TOTAL_DIFF_OVERFLOW: response.ERROR_UNSUPPORTED_CHAIN, 

1026 err.PROT_INVALID: response.ERROR_BLOCK_DATA, 

1027 }, 

1028 ) 

1029 

1030 # Ask the device to update its ancestor block and ancestor receipts root 

1031 # references by processing a given set of blocks. 

1032 # blocks: list of hex strings 

1033 # (each hex string is a raw block header, 

1034 # which doesn't need to include merge mining fields - 

1035 # those will be stripped for efficiency before being sent 

1036 # to the device anyway) 

1037 def update_ancestor(self, blocks): 

1038 # Convenient shorthands 

1039 err = self.ERR.UPD_ANCESTOR 

1040 response = self.RESPONSE.UPD_ANCESTOR 

1041 

1042 # Optimization: remove merge mining fields (if present) from blocks 

1043 try: 

1044 self.logger.info("Removing merge mining fields from %d blocks", len(blocks)) 

1045 optimized_blocks = list(map(remove_mm_fields_if_present, blocks)) 

1046 except ValueError as e: 

1047 self.logger.error("While removing merge mining fields: %s", str(e)) 

1048 return (False, response.ERROR_REMOVE_MM_FIELDS) 

1049 

1050 return self._do_block_operation( 

1051 "updancestor", 

1052 optimized_blocks, 

1053 None, 

1054 self.CMD.UPD_ANCESTOR, 

1055 self.OP.UPD_ANCESTOR, 

1056 err, 

1057 response, 

1058 { 

1059 err.BUFFER_OVERFLOW: response.ERROR_INVALID_BLOCK, 

1060 err.RLP_INVALID: response.ERROR_INVALID_BLOCK, 

1061 err.BLOCK_TOO_SHORT: response.ERROR_INVALID_BLOCK, 

1062 err.PARENT_HASH_INVALID: response.ERROR_INVALID_BLOCK, 

1063 err.RECEIPT_ROOT_INVALID: response.ERROR_INVALID_BLOCK, 

1064 err.BTC_HEADER_INVALID: response.ERROR_INVALID_BLOCK, 

1065 err.BLOCK_NUM_INVALID: response.ERROR_INVALID_BLOCK, 

1066 err.BLOCK_TOO_OLD: response.ERROR_INVALID_BLOCK, 

1067 err.MM_RLP_LEN_MISMATCH: response.ERROR_INVALID_BLOCK, 

1068 err.ANCESTOR_TIP_MISMATCH: response.ERROR_TIP_MISMATCH, 

1069 err.CHAIN_MISMATCH: response.ERROR_CHAINING_MISMATCH, 

1070 err.PROT_INVALID: response.ERROR_BLOCK_DATA, 

1071 }, 

1072 ) 

1073 

1074 def get_ui_attestation(self, ud_value_hex): 

1075 # Parse hexadecimal values 

1076 ud_value = bytes.fromhex(ud_value_hex) 

1077 

1078 # Get UI hash 

1079 ui_hash = self._send_command( 

1080 self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_APP_HASH]) 

1081 )[self.OFF.DATA:] 

1082 

1083 # Send UD value 

1084 data = bytes([self.OP.UI_ATT.OP_UD_VALUE]) + ud_value 

1085 self._send_command(self.CMD.UI_ATT, data) 

1086 

1087 # Retrieve message 

1088 page = 0 

1089 message = b"" 

1090 while True: 

1091 if page == self.MAX_PAGES_UI_ATT_MESSAGE: 

1092 msg = ( 

1093 "Maximum number of UI attestation pages exceeded ()" 

1094 % self.MAX_PAGES_UI_ATT_MESSAGE 

1095 ) 

1096 self.logger.error(msg) 

1097 raise HSM2DongleError(msg) 

1098 data = bytes([self.OP.UI_ATT.OP_GET_MSG, page]) 

1099 response = self._send_command(self.CMD.UI_ATT, data) 

1100 page += 1 

1101 message += response[self.OFF.DATA + 1:] 

1102 if response[self.OFF.DATA] == 0: 

1103 break 

1104 

1105 # Retrieve attestation 

1106 attestation = self._send_command(self.CMD.UI_ATT, bytes([self.OP.UI_ATT.OP_GET]))[self.OFF.DATA:] # noqa E501 

1107 

1108 return { 

1109 "app_hash": ui_hash.hex(), 

1110 "message": message.hex(), 

1111 "signature": attestation.hex(), 

1112 } 

1113 

1114 def get_signer_attestation(self): 

1115 # Get signer hash 

1116 signer_hash = self._send_command( 

1117 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_APP_HASH]) 

1118 )[self.OFF.DATA:] 

1119 

1120 # Retrieve attestation 

1121 attestation = self._send_command( 

1122 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_GET]) 

1123 )[self.OFF.DATA:] 

1124 

1125 # Retrieve message 

1126 message = self._send_command( 

1127 self.CMD.SIGNER_ATT, bytes([self.OP.SIGNER_ATT.OP_GET_MESSAGE]) 

1128 )[self.OFF.DATA:] 

1129 

1130 return { 

1131 "app_hash": signer_hash.hex(), 

1132 "message": message.hex(), 

1133 "signature": attestation.hex(), 

1134 } 

1135 

1136 def get_signer_heartbeat(self, ud_value): 

1137 return HSM2SignerHeartbeat(self).run(ud_value) 

1138 

1139 def get_ui_heartbeat(self, ud_value): 

1140 return HSM2UIHeartbeat(self).run(ud_value) 

1141 

1142 def authorize_signer(self, signer_authorization): 

1143 # Send signer version 

1144 self._send_command(self.CMD.SIGNER_AUTH, 

1145 bytes([self.OP.SIGNER_AUTH.OP_SIGVER]) + 

1146 bytes.fromhex(signer_authorization.signer_version.hash) + 

1147 signer_authorization.signer_version.iteration.to_bytes( 

1148 self.SIGNER_AUTH_ITERATION_SIZE, 

1149 byteorder='big', signed=False)) 

1150 

1151 # Send signatures one by one 

1152 result = None 

1153 for signature in signer_authorization.signatures: 

1154 result = self._send_command(self.CMD.SIGNER_AUTH, 

1155 bytes([self.OP.SIGNER_AUTH.OP_SIGN]) + 

1156 bytes.fromhex(signature))[self.OFF.DATA] 

1157 # Are we done? 

1158 if result == self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS: 

1159 return True 

1160 

1161 # Are we not done after all signatures were sent? 

1162 if result != self.OP.SIGNER_AUTH.OP_SIGN_RES_SUCCESS: 

1163 raise HSM2DongleError("Not enough signatures given. " 

1164 "Signer authorization failed") 

1165 

1166 return True 

1167 

1168 # Used both for advance blockchain and update ancestor given the protocol 

1169 # is very similar 

1170 def _do_block_operation( 

1171 self, 

1172 operation_name, 

1173 blocks, 

1174 brothers, 

1175 command, 

1176 ops, 

1177 errors, 

1178 responses, 

1179 chunk_error_mapping, 

1180 ): 

1181 # *** Block operation protocol *** 

1182 # The order in which things are required and then sent is: 

1183 # 1. Initialization, where the total number of blocks to send is sent. 

1184 # 2. For each block header: 

1185 # 2.1. Block metadata (single message): 

1186 # - MM payload size in bytes 

1187 # (see the block_utils.rlp_mm_payload_size method for details on this) 

1188 # - In case of an advance blockchain operation, 

1189 # coinbase transaction hash (see the block_utils.coinbase_tx_get_hash 

1190 # for details on this) 

1191 # 2.2. Block chunks: block header pieces as requested by the ledger. 

1192 # 2.3. Brothers -- only for advance blockchain: 

1193 # 2.3.1 Brothers metadata (single message): 

1194 # - Brother count 

1195 # 2.3.2 For each brother (if brother count was greater than zero): 

1196 # 2.3.2.1. Brother metadata (single message): 

1197 # - MM payload size in bytes 

1198 # (see the block_utils.rlp_mm_payload_size method for details on this) 

1199 # - Coinbase transaction hash (see the block_utils.coinbase_tx_get_hash 

1200 # for details on this) 

1201 # 2.3.2.2. Brother chunks: brother header pieces as requested by the ledger. 

1202 # 

1203 # During these exchanges, an exception can be raised at any moment, which 

1204 # would signal failure. Specific error codes come with HSM2DongleErrorResult 

1205 # exception instances and are handled accordingly. Anything else 

1206 # is treated as an unexpected error and is let for the calling layer 

1207 # to handle. 

1208 

1209 # Step 1. Send initialization 

1210 num_blocks_bytes = len(blocks).to_bytes(4, byteorder="big", signed=False) 

1211 data = bytes([ops.INIT]) + num_blocks_bytes 

1212 try: 

1213 self.logger.info( 

1214 "%s: sending initialization - %s", operation_name.capitalize(), data.hex() 

1215 ) 

1216 response = self._send_command(command, data) 

1217 

1218 # We expect the device to ask for block metadata next. 

1219 # If this doesn't happen, error out 

1220 if response[self.OFF.OP] != ops.HEADER_META: 

1221 self.logger.error( 

1222 "%s: unexpected response %s", 

1223 operation_name.capitalize(), 

1224 response.hex(), 

1225 ) 

1226 return (False, responses.ERROR_UNEXPECTED) 

1227 except HSM2DongleErrorResult as e: 

1228 self.logger.error( 

1229 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1230 ) 

1231 if e.error_code in [errors.PROT_INVALID]: 

1232 return (False, responses.ERROR_INIT) 

1233 return (False, responses.ERROR_UNEXPECTED) 

1234 

1235 # Step 2. Send blocks (and brothers, if any) 

1236 total_blocks = len(blocks) 

1237 for block_number, block in enumerate(blocks, 1): 

1238 self.logger.info( 

1239 "%s: sending block #%d/%d", 

1240 operation_name.capitalize(), 

1241 block_number, 

1242 total_blocks, 

1243 ) 

1244 

1245 response = self._send_block_header( 

1246 operation_name=operation_name, 

1247 header_name="block", 

1248 block=block, 

1249 command=command, 

1250 ops=ops, 

1251 op_meta=ops.HEADER_META, 

1252 op_chunk=ops.HEADER_CHUNK, 

1253 responses=responses, 

1254 errors=errors, 

1255 chunk_error_mapping=chunk_error_mapping 

1256 ) 

1257 if not response[0]: 

1258 return response 

1259 

1260 # Step 2.3. Send brothers 

1261 # *** Only for advance blockchain and if requested by the dongle *** 

1262 if command == self.CMD.ADVANCE and \ 

1263 response[1][self.OFF.OP] == ops.BROTHER_LIST_META: 

1264 

1265 # Step 2.3.1. Send brother list metadata 

1266 brother_list = brothers[block_number-1] 

1267 brother_count = len(brother_list) 

1268 brother_count_bytes = brother_count.to_bytes(1, 

1269 byteorder="big", 

1270 signed=False) 

1271 data = bytes([ops.BROTHER_LIST_META]) + brother_count_bytes 

1272 try: 

1273 self.logger.info( 

1274 "%s: sending brother list metadata - %s", 

1275 operation_name.capitalize(), data.hex() 

1276 ) 

1277 response = [None, self._send_command(command, data)] 

1278 

1279 # If we have at least one brother, 

1280 # we expect the device to ask for brother metadata next. 

1281 # If this doesn't happen, error out 

1282 if brother_count > 0 and response[1][self.OFF.OP] != ops.BROTHER_META: 

1283 self.logger.error( 

1284 "%s: unexpected response %s", 

1285 operation_name.capitalize(), 

1286 response[1].hex(), 

1287 ) 

1288 return (False, responses.ERROR_UNEXPECTED) 

1289 except HSM2DongleErrorResult as e: 

1290 self.logger.error( 

1291 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1292 ) 

1293 if e.error_code in [errors.PROT_INVALID, errors.BROTHERS_TOO_MANY]: 

1294 return (False, responses.ERROR_INVALID_BROTHERS) 

1295 return (False, responses.ERROR_UNEXPECTED) 

1296 

1297 # Step 2.3.2. Send each brother 

1298 for brother_number, brother in enumerate(brother_list, 1): 

1299 self.logger.info( 

1300 "%s: sending brother #%d/%d", 

1301 operation_name.capitalize(), 

1302 brother_number, 

1303 brother_count, 

1304 ) 

1305 

1306 response = self._send_block_header( 

1307 operation_name=operation_name, 

1308 header_name="brother", 

1309 block=brother, 

1310 command=command, 

1311 ops=ops, 

1312 op_meta=ops.BROTHER_META, 

1313 op_chunk=ops.BROTHER_CHUNK, 

1314 responses=responses, 

1315 errors=errors, 

1316 chunk_error_mapping=chunk_error_mapping 

1317 ) 

1318 if not response[0]: 

1319 return response 

1320 

1321 # Partial success? 

1322 if command == self.CMD.ADVANCE and response[1][self.OFF.OP] == ops.PARTIAL: 

1323 self.logger.info("%s: partial success", operation_name.capitalize()) 

1324 return (True, responses.OK_PARTIAL) 

1325 

1326 # Success? 

1327 if response[1][self.OFF.OP] == ops.SUCCESS: 

1328 self.logger.info("%s: total success", operation_name.capitalize()) 

1329 return (True, responses.OK_TOTAL) 

1330 

1331 # We shouldn't be able to ever reach this point 

1332 msg = "%s: unexpected state" % operation_name.capitalize() 

1333 self.logger.fatal(msg) 

1334 raise HSM2DongleError(msg) 

1335 

1336 # Send an individual block header to the device, including computing 

1337 # and sending metadata 

1338 # This is used both for advance blockchain (block and brother headers) 

1339 # and update ancestor given the protocol is very similar 

1340 def _send_block_header( 

1341 self, 

1342 operation_name, 

1343 header_name, 

1344 block, 

1345 command, 

1346 ops, 

1347 op_meta, 

1348 op_chunk, 

1349 responses, 

1350 errors, 

1351 chunk_error_mapping 

1352 ): 

1353 # A. Compute and send block metadata 

1354 # (this will also validate that the block is a valid RLP-encoded list 

1355 # of the proper size) 

1356 try: 

1357 # RLP payload size for merge mining hash 

1358 mm_payload_size = rlp_mm_payload_size(block) 

1359 self.logger.debug( 

1360 "%s metadata: MM payload length %d", 

1361 header_name.capitalize(), 

1362 mm_payload_size) 

1363 mm_payload_size_bytes = mm_payload_size.to_bytes( 

1364 2, byteorder="big", signed=False 

1365 ) 

1366 # Coinbase transaction hash 

1367 cb_txn_hash = bytes([]) 

1368 if command == self.CMD.ADVANCE: 

1369 cb_txn_hash = bytes.fromhex( 

1370 coinbase_tx_get_hash(get_coinbase_txn(block)) 

1371 ) 

1372 self.logger.debug( 

1373 "%s Metadata: CB txn hash: %s", 

1374 header_name.capitalize(), 

1375 cb_txn_hash.hex()) 

1376 # Wrap and send 

1377 data = bytes([op_meta]) + mm_payload_size_bytes + cb_txn_hash 

1378 self.logger.info( 

1379 "%s: sending %s metadata - %s", 

1380 operation_name.capitalize(), 

1381 header_name, 

1382 data.hex() 

1383 ) 

1384 response = self._send_command(command, data) 

1385 

1386 # We expect the device to ask for a block chunk next. 

1387 # If this doesn't happen, error out 

1388 if response[self.OFF.OP] != op_chunk: 

1389 self.logger.error( 

1390 "%s: unexpected response %s", 

1391 operation_name.capitalize(), 

1392 response.hex(), 

1393 ) 

1394 return (False, responses.ERROR_UNEXPECTED) 

1395 

1396 # How many bytes to send as the first block chunk 

1397 bytes_requested = response[self.OFF.DATA] 

1398 except ValueError as e: 

1399 self.logger.error("Computing %s metadata: %s", header_name, str(e)) 

1400 return (False, responses.ERROR_COMPUTE_METADATA) 

1401 except HSM2DongleErrorResult as e: 

1402 self.logger.error( 

1403 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1404 ) 

1405 if e.error_code in [errors.PROT_INVALID]: 

1406 return (False, responses.ERROR_METADATA) 

1407 return (False, responses.ERROR_UNEXPECTED) 

1408 

1409 # B. Send block data in chunks 

1410 try: 

1411 # Next possible operations depending on the specific command 

1412 # and type of header we're sending 

1413 next_operations = [op_chunk, op_meta, ops.SUCCESS] 

1414 if command == self.CMD.ADVANCE: 

1415 next_operations.append(ops.PARTIAL) 

1416 if header_name == "block": 

1417 next_operations.append(ops.BROTHER_LIST_META) 

1418 if header_name == "brother": 

1419 next_operations.append(ops.HEADER_META) 

1420 

1421 response = self._send_data_in_chunks( 

1422 command=command, 

1423 operation=op_chunk, 

1424 next_operations=next_operations, 

1425 data=bytes.fromhex(block), 

1426 expect_full_data=False, 

1427 initial_bytes=bytes_requested, 

1428 operation_name=operation_name, 

1429 data_description=header_name, 

1430 ) 

1431 

1432 if not response[0]: 

1433 return (False, responses.ERROR_UNEXPECTED) 

1434 except HSM2DongleErrorResult as e: 

1435 self.logger.error( 

1436 "%s returned: %s", operation_name.capitalize(), hex(e.error_code) 

1437 ) 

1438 return ( 

1439 False, 

1440 chunk_error_mapping.get(e.error_code, responses.ERROR_UNEXPECTED), 

1441 ) 

1442 

1443 return response 

1444 

1445 # Send a specific piece of data in chunks to the device 

1446 # as the device requests bytes from it. 

1447 # Validate responses wrt current operation and next possible expected operations 

1448 # Exceptions are to be handled by the caller 

1449 def _send_data_in_chunks( 

1450 self, 

1451 command, 

1452 operation, 

1453 next_operations, 

1454 data, 

1455 expect_full_data, 

1456 initial_bytes, 

1457 operation_name, 

1458 data_description, 

1459 ): 

1460 offset = 0 

1461 bytes_requested = initial_bytes 

1462 total_bytes_sent = 0 

1463 finished = False 

1464 while not finished: 

1465 to_send = data[offset:offset + bytes_requested] 

1466 to_send_length = len(to_send) 

1467 self.logger.debug( 

1468 "%s: sending %s chunk [%d:%d] - %s", 

1469 operation_name.capitalize(), 

1470 data_description, 

1471 offset, 

1472 offset + to_send_length, 

1473 to_send.hex(), 

1474 ) 

1475 response = self._send_command(command, bytes([operation]) + to_send) 

1476 

1477 # Increase count and buffer pointer 

1478 total_bytes_sent += to_send_length 

1479 offset += to_send_length 

1480 

1481 # We expect the device to either ask for the current or for the 

1482 # next operation. 

1483 # If none of this happens, error out 

1484 if response[self.OFF.OP] not in ([operation] + next_operations): 

1485 self.logger.debug( 

1486 "Current operation %s, next operations %s, ledger requesting %s", 

1487 hex(operation), 

1488 str(list(map(hex, next_operations))), 

1489 hex(response[2]), 

1490 ) 

1491 self.logger.error( 

1492 "%s: unexpected response %s", 

1493 operation_name.capitalize(), 

1494 response.hex(), 

1495 ) 

1496 return (False, response) 

1497 

1498 # We finish when the device requests the next piece of data 

1499 finished = response[self.OFF.OP] != operation 

1500 

1501 # Have we finished but not sent all data when required to do so? 

1502 if expect_full_data and finished and total_bytes_sent < len(data): 

1503 self.logger.error( 

1504 "%s: expected to send all %d data bytes but sent %d and got %s", 

1505 operation_name.capitalize(), 

1506 len(data), 

1507 total_bytes_sent, 

1508 response.hex() 

1509 ) 

1510 return (False, response) 

1511 

1512 # How many bytes to send in the next message 

1513 if not finished: 

1514 bytes_requested = response[self.OFF.DATA] 

1515 self.logger.debug("Dongle requested %d bytes", bytes_requested) 

1516 

1517 # All is good 

1518 return (True, response)