Coverage for ledger/protocol.py: 76%

299 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import time 

24from comm.protocol import HSM2Protocol, HSM2ProtocolError, HSM2ProtocolInterrupt 

25from comm.platform import Platform 

26from ledger.hsm2dongle import ( 

27 HSM2Dongle, 

28 HSM2DongleBaseError, 

29 HSM2DongleError, 

30 HSM2DongleErrorResult, 

31 HSM2DongleTimeoutError, 

32 HSM2DongleCommError, 

33 HSM2FirmwareVersion, 

34 SighashComputationMode, 

35) 

36from comm.bitcoin import get_unsigned_tx, get_tx_hash 

37 

38 

39class HSM2ProtocolLedger(HSM2Protocol): 

40 # Current manager supported versions for HSM UI and HSM SIGNER (<=) 

41 UI_VERSION = HSM2FirmwareVersion(5, 5, 1) 

42 APP_VERSION = HSM2FirmwareVersion(5, 5, 1) 

43 

44 # Amount of time to wait to make sure the app is opened 

45 OPEN_APP_WAIT = 1 # second 

46 

47 # Required minimum number of pin retries available to proceed with unlocking 

48 MIN_AVAILABLE_RETRIES = 2 

49 

50 def __init__(self, pin, dongle): 

51 super().__init__() 

52 self.hsm2dongle = dongle 

53 self._comm_issue = False 

54 self.pin = pin 

55 

56 def initialize_device(self): 

57 # Connection 

58 try: 

59 self.logger.info("Connecting to dongle") 

60 self.hsm2dongle.connect() 

61 self.logger.info("Connected to dongle") 

62 except HSM2DongleBaseError as e: 

63 self.logger.error(e) 

64 raise HSM2ProtocolError(e) 

65 

66 # Onboard check 

67 try: 

68 is_onboarded = self.hsm2dongle.is_onboarded() 

69 self.logger.info("Dongle onboarded: %s", "yes" if is_onboarded else "no") 

70 if not is_onboarded: 

71 self.logger.error("Dongle not onboarded, exiting") 

72 raise HSM2ProtocolError("Dongle not onboarded") 

73 except HSM2DongleBaseError: 

74 self.logger.info( 

75 "Could not determine onboarded status. If unlocked, " 

76 + "please enter the signing app and rerun the manager. Otherwise," 

77 + f"{Platform.message('restart')} and try again" 

78 ) 

79 raise HSM2ProtocolInterrupt() 

80 

81 # Mode check 

82 self.logger.info("Finding mode") 

83 current_mode = self.hsm2dongle.get_current_mode() 

84 self.logger.debug("Mode #%s", current_mode) 

85 

86 if current_mode == HSM2Dongle.MODE.BOOTLOADER: 

87 self._handle_bootloader() 

88 

89 # After handling the bootloader, we need to reload the mode since 

90 # it should change 

91 self.logger.info("Finding mode") 

92 current_mode = self.hsm2dongle.get_current_mode() 

93 self.logger.debug("Mode #%s", current_mode) 

94 

95 # In this point, the mode should be signer. 

96 # Otherwise, we tell the user to manually enter the signer and run 

97 # the manager again 

98 if current_mode != HSM2Dongle.MODE.SIGNER: 

99 self.logger.info( 

100 "Dongle mode unknown. Please manually enter the signer " 

101 "and re-run the manager" 

102 ) 

103 raise HSM2ProtocolInterrupt() 

104 

105 self.logger.info("Mode: Signing App") 

106 

107 # Verify that the app's version is correct 

108 self._dongle_app_version = self.hsm2dongle.get_version() 

109 self._check_version(self._dongle_app_version, self.APP_VERSION, "App") 

110 

111 # Get and report signer parameters 

112 signer_parameters = self.hsm2dongle.get_signer_parameters() 

113 self.logger.info("Gathered signer parameters") 

114 self.logger.info("Checkpoint 0x%s", signer_parameters.checkpoint) 

115 self.logger.info( 

116 "Minimum required difficulty %s", 

117 hex(signer_parameters.min_required_difficulty), 

118 ) 

119 self.logger.info("Network %s", signer_parameters.network.name) 

120 

121 def report_comm_issue(self): 

122 self._comm_issue = True 

123 

124 def ensure_connection(self): 

125 if not self._comm_issue: 

126 return 

127 

128 # Attempt to reconnect 

129 self.logger.info("Attempting dongle reconnection") 

130 self.hsm2dongle.disconnect() 

131 try: 

132 self.initialize_device() 

133 self._comm_issue = False 

134 self.logger.info("Reconnection successful") 

135 except HSM2ProtocolError as e: 

136 # Capture any initialization issues 

137 # (which would include communication problems, 

138 # such as failure to connect) and bubble them 

139 # as a dongle communication error, so that 

140 # the reply to the client will be appropiate and 

141 # a subsequent reconnection will be attempted 

142 # upon the next command 

143 # (don't log anything here, initialize_device will have done so) 

144 raise HSM2DongleCommError("While attempting to reconnect: %s", str(e)) 

145 

146 # Do what needs to be done to get past the "bootloader" mode 

147 # That includes checking the bootloader (UI) version, testing echo, 

148 # unlocking the device and (potentially) changing the device PIN 

149 # Finally, exiting the bootloader which should take the user to the 

150 # signer app. 

151 def _handle_bootloader(self): 

152 self.logger.info("Mode: Bootloader") 

153 

154 # Version check 

155 self._dongle_ui_version = self.hsm2dongle.get_version() 

156 self._check_version(self._dongle_ui_version, self.UI_VERSION, "UI") 

157 

158 # Echo check 

159 self.logger.info("Sending echo") 

160 if not self.hsm2dongle.echo(): 

161 self._error("Echo error") 

162 self.logger.info("Echo OK") 

163 

164 # Get the number of retries available to unlock the device 

165 # Then, only proceed if there is more than the minimum available 

166 # retries required (otherwise we risk wiping the device) 

167 try: 

168 self.logger.info("Retrieving available pin retries") 

169 retries = self.hsm2dongle.get_retries() 

170 self.logger.info("Available pin retries: %d", retries) 

171 if retries < self.MIN_AVAILABLE_RETRIES: 

172 self.logger.error( 

173 "Available number of pin retries (%d) not enough " 

174 "to attempt a device unlock. Aborting.", 

175 retries, 

176 ) 

177 raise HSM2ProtocolInterrupt() 

178 except HSM2DongleBaseError as e: 

179 self.logger.error( 

180 "While trying to get number of pin retries: %s. Aborting.", str(e) 

181 ) 

182 raise HSM2ProtocolInterrupt() 

183 

184 # Unlock device with PIN 

185 self.logger.info("Unlocking with PIN") 

186 if not self.hsm2dongle.unlock(self.pin.get_pin()): 

187 self._error("Unable to unlock: PIN mismatch") 

188 self.logger.info("PIN accepted") 

189 

190 # First PIN use check 

191 if self.pin.needs_change(): 

192 try: 

193 self.logger.info("PIN change need detected. Generating and changing PIN") 

194 self.pin.start_change() 

195 self.logger.info("Sending new PIN") 

196 if not self.hsm2dongle.new_pin(self.pin.get_new_pin()): 

197 raise Exception("Dongle reported fail to change pin. Pin invalid?") 

198 self.pin.commit_change() 

199 self.logger.info( 

200 f"PIN changed. Please {Platform.message('restart')}" 

201 ) 

202 except Exception as e: 

203 self.pin.abort_change() 

204 self.logger.error( 

205 f"Error changing PIN: %s. Please {Platform.message('restart')} " 

206 "and try again", format(e), 

207 ) 

208 finally: 

209 raise HSM2ProtocolInterrupt() 

210 

211 # This loads the app if in bootloader mode 

212 # This usually fails with a timeout, but its fine cause 

213 # what happens is that when opening the app, 

214 # the ledger disconnects from usb and reconnects 

215 self.logger.info("Loading Signing app") 

216 try: 

217 self.hsm2dongle.exit_menu() 

218 except Exception: 

219 # exit_menu() always throws due to USB disconnection. we don't care 

220 pass 

221 

222 self._wait_and_reconnect() 

223 

224 def _wait_and_reconnect(self): 

225 # Wait a little bit to make sure the app is loaded 

226 # and then reconnect to the dongle 

227 time.sleep(self.OPEN_APP_WAIT) 

228 self.hsm2dongle.disconnect() 

229 self.hsm2dongle.connect() 

230 

231 def _check_version(self, fware_version, mware_version, name): 

232 self.logger.info( 

233 "%s version: %s (supported <= %s)", name, fware_version, mware_version 

234 ) 

235 if not mware_version.supports(fware_version): 

236 self._error( 

237 "Unsupported %s version: Dongle reports %s, Node needs <= %s" 

238 % (name, fware_version, mware_version) 

239 ) 

240 

241 def _error(self, msg): 

242 self.logger.error(msg) 

243 raise HSM2ProtocolError(msg) 

244 

245 def _get_pubkey(self, request): 

246 try: 

247 self.ensure_connection() 

248 return ( 

249 self.ERROR_CODE_OK, 

250 {"pubKey": self.hsm2dongle.get_public_key(request["keyId"])}, 

251 ) 

252 except HSM2DongleErrorResult: 

253 return (self.ERROR_CODE_INVALID_KEYID,) 

254 except HSM2DongleTimeoutError: 

255 self.logger.error("Dongle timeout getting public key") 

256 return (self.ERROR_CODE_DEVICE,) 

257 except HSM2DongleCommError: 

258 # Signal a communication problem and return a device error 

259 self._comm_issue = True 

260 self.logger.error("Dongle communication error getting public key") 

261 return (self.ERROR_CODE_DEVICE,) 

262 except HSM2DongleError as e: 

263 return self._error("Dongle error in get_pubkey: %s" % str(e)) 

264 

265 def _sign(self, request): 

266 # First validate the required fields are OK 

267 if "message" in request and "hash" in request["message"]: 

268 # Unauthorized signing 

269 message_validation = self._validate_message(request, what="hash") 

270 if message_validation < self.ERROR_CODE_OK: 

271 return (message_validation,) 

272 

273 try: 

274 self.ensure_connection() 

275 sign_result = self.hsm2dongle.sign_unauthorized( 

276 key_id=request["keyId"], hash=request["message"]["hash"] 

277 ) 

278 except HSM2DongleTimeoutError: 

279 self.logger.error("Dongle timeout signing") 

280 return (self.ERROR_CODE_DEVICE,) 

281 except HSM2DongleCommError: 

282 # Signal a communication problem and return a device error 

283 self._comm_issue = True 

284 self.logger.error("Dongle communication error signing") 

285 return (self.ERROR_CODE_DEVICE,) 

286 except HSM2DongleError as e: 

287 self._error("Dongle error in sign: %s" % str(e)) 

288 else: 

289 # Authorized signing 

290 auth_validation = self._validate_auth(request, mandatory=True) 

291 if auth_validation < self.ERROR_CODE_OK: 

292 return (auth_validation,) 

293 

294 message_validation = self._validate_message(request, what="tx") 

295 if message_validation < self.ERROR_CODE_OK: 

296 return (message_validation,) 

297 

298 # Shorthand 

299 msg = request["message"] 

300 

301 # Make sure the transaction 

302 # is fully unsigned before sending. 

303 try: 

304 unsigned_btc_tx = get_unsigned_tx(msg["tx"]) 

305 self.logger.debug("Unsigned BTC tx: %s", get_tx_hash(unsigned_btc_tx)) 

306 except Exception as e: 

307 self.logger.error("Error unsigning BTC tx: %s", str(e)) 

308 return (self.ERROR_CODE_INVALID_MESSAGE,) 

309 

310 try: 

311 self.ensure_connection() 

312 sign_result = self.hsm2dongle.sign_authorized( 

313 key_id=request["keyId"], 

314 rsk_tx_receipt=request["auth"]["receipt"], 

315 receipt_merkle_proof=request["auth"]["receipt_merkle_proof"], 

316 btc_tx=unsigned_btc_tx, 

317 input_index=msg["input"], 

318 sighash_computation_mode=SighashComputationMode( 

319 msg["sighashComputationMode"]), 

320 witness_script=msg.get("witnessScript"), 

321 outpoint_value=msg.get("outpointValue") 

322 ) 

323 except HSM2DongleTimeoutError: 

324 self.logger.error("Dongle timeout signing") 

325 return (self.ERROR_CODE_DEVICE,) 

326 except HSM2DongleCommError: 

327 # Signal a communication problem and return a device error 

328 self._comm_issue = True 

329 self.logger.error("Dongle communication error signing") 

330 return (self.ERROR_CODE_DEVICE,) 

331 except HSM2DongleError as e: 

332 self._error("Dongle error in sign: %s" % str(e)) 

333 

334 # Signing result is the same for both authorized and non authorized signing 

335 if not sign_result[0]: 

336 return (self._translate_sign_error(sign_result[1]),) 

337 signature = sign_result[1] 

338 

339 return (self.ERROR_CODE_OK, {"signature": {"r": signature.r, "s": signature.s}}) 

340 

341 def _translate_sign_error(self, error_code): 

342 return ( 

343 { 

344 HSM2Dongle.RESPONSE.SIGN.ERROR_PATH: self.ERROR_CODE_INVALID_KEYID, 

345 HSM2Dongle.RESPONSE.SIGN.ERROR_BTC_TX: self.ERROR_CODE_INVALID_MESSAGE, 

346 HSM2Dongle.RESPONSE.SIGN.ERROR_TX_RECEIPT: self.ERROR_CODE_INVALID_AUTH, 

347 HSM2Dongle.RESPONSE.SIGN.ERROR_MERKLE_PROOF: self.ERROR_CODE_INVALID_AUTH, 

348 HSM2Dongle.RESPONSE.SIGN.ERROR_HASH: self.ERROR_CODE_INVALID_MESSAGE, 

349 HSM2Dongle.RESPONSE.SIGN.ERROR_UNEXPECTED: self.ERROR_CODE_DEVICE, 

350 } 

351 ).get(error_code, self.ERROR_CODE_UNKNOWN) 

352 

353 def _blockchain_state(self, request): 

354 try: 

355 self.ensure_connection() 

356 state = self.hsm2dongle.get_blockchain_state() 

357 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

358 self.logger.error("Dongle error getting blockchain state: %s", str(e)) 

359 return (self.ERROR_CODE_DEVICE,) 

360 except HSM2DongleCommError: 

361 # Signal a communication problem and return a device error 

362 self._comm_issue = True 

363 self.logger.error("Dongle communication error getting blockchain state") 

364 return (self.ERROR_CODE_DEVICE,) 

365 

366 state_result = { 

367 "best_block": state["best_block"], 

368 "newest_valid_block": state["newest_valid_block"], 

369 "ancestor_block": state["ancestor_block"], 

370 "ancestor_receipts_root": state["ancestor_receipts_root"], 

371 "updating": { 

372 "best_block": state["updating.best_block"], 

373 "newest_valid_block": state["updating.newest_valid_block"], 

374 "next_expected_block": state["updating.next_expected_block"], 

375 "total_difficulty": state["updating.total_difficulty"], 

376 "in_progress": state["updating.in_progress"], 

377 "already_validated": state["updating.already_validated"], 

378 "found_best_block": state["updating.found_best_block"], 

379 }, 

380 } 

381 

382 return (self.ERROR_CODE_OK, {"state": state_result}) 

383 

384 def _reset_advance_blockchain(self, request): 

385 try: 

386 self.ensure_connection() 

387 self.hsm2dongle.reset_advance_blockchain() 

388 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

389 self.logger.error("Dongle error resetting advance blockchain: %s", str(e)) 

390 return (self.ERROR_CODE_DEVICE,) 

391 except HSM2DongleCommError: 

392 # Signal a communication problem and return a device error 

393 self._comm_issue = True 

394 self.logger.error("Dongle communication error resetting advance blockchain") 

395 return (self.ERROR_CODE_DEVICE,) 

396 

397 return (self.ERROR_CODE_OK, {}) 

398 

399 def _advance_blockchain(self, request): 

400 try: 

401 self.ensure_connection() 

402 advance_result = self.hsm2dongle.advance_blockchain( 

403 request["blocks"], request["brothers"] 

404 ) 

405 return (self._translate_advance_result(advance_result[1]), {}) 

406 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

407 self.logger.error("Dongle error in advance blockchain: %s", str(e)) 

408 return (self.ERROR_CODE_DEVICE,) 

409 except HSM2DongleCommError: 

410 # Signal a communication problem and return a device error 

411 self._comm_issue = True 

412 self.logger.error("Dongle communication error in advance blockchain") 

413 return (self.ERROR_CODE_DEVICE,) 

414 

415 def _translate_advance_result(self, result): 

416 DERR = HSM2Dongle.RESPONSE.ADVANCE 

417 return ({ 

418 DERR.OK_TOTAL: self.ERROR_CODE_OK, 

419 DERR.OK_PARTIAL: self.ERROR_CODE_OK_PARTIAL, 

420 DERR.ERROR_INIT: self.ERROR_CODE_DEVICE, 

421 DERR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

422 DERR.ERROR_METADATA: self.ERROR_CODE_DEVICE, 

423 DERR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE, 

424 DERR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

425 DERR.ERROR_POW_INVALID: self.ERROR_CODE_POW_INVALID, 

426 DERR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501 

427 DERR.ERROR_UNSUPPORTED_CHAIN: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

428 DERR.ERROR_INVALID_BROTHERS: self.ERROR_CODE_INVALID_BROTHERS, # noqa E501 

429 DERR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN, 

430 }).get(result, self.ERROR_CODE_UNKNOWN) 

431 

432 def _update_ancestor_block(self, request): 

433 try: 

434 self.ensure_connection() 

435 update_result = self.hsm2dongle.update_ancestor(request["blocks"]) 

436 return (self._translate_update_ancestor_result(update_result[1]), {}) 

437 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

438 self.logger.error("Dongle error in update ancestor: %s", str(e)) 

439 return (self.ERROR_CODE_DEVICE,) 

440 except HSM2DongleCommError: 

441 # Signal a communication problem and return a device error 

442 self._comm_issue = True 

443 self.logger.error("Dongle communication error in update ancestor") 

444 return (self.ERROR_CODE_DEVICE,) 

445 

446 def _translate_update_ancestor_result(self, result): 

447 return ({ 

448 HSM2Dongle.RESPONSE.UPD_ANCESTOR.OK_TOTAL: self.ERROR_CODE_OK, 

449 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INIT: self.ERROR_CODE_DEVICE, 

450 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

451 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_METADATA: self.ERROR_CODE_DEVICE, 

452 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE, 

453 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

454 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501 

455 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_TIP_MISMATCH: self.ERROR_CODE_TIP_MISMATCH, # noqa E501 

456 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_REMOVE_MM_FIELDS: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

457 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN, 

458 }).get(result, self.ERROR_CODE_UNKNOWN) 

459 

460 def _get_blockchain_parameters(self, request): 

461 try: 

462 self.ensure_connection() 

463 params = self.hsm2dongle.get_signer_parameters() 

464 return (self.ERROR_CODE_OK, {"parameters": { 

465 "checkpoint": params.checkpoint, 

466 "minimum_difficulty": params.min_required_difficulty, 

467 "network": params.network.name.lower()} 

468 }) 

469 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

470 self.logger.error("Dongle error in get parameters: %s", str(e)) 

471 return (self.ERROR_CODE_DEVICE,) 

472 except HSM2DongleCommError: 

473 # Signal a communication problem and return a device error 

474 self._comm_issue = True 

475 self.logger.error("Dongle communication error in get parameters") 

476 return (self.ERROR_CODE_DEVICE,) 

477 

478 def _signer_heartbeat(self, request): 

479 try: 

480 self.ensure_connection() 

481 

482 heartbeat = self.hsm2dongle.get_signer_heartbeat(request["udValue"]) 

483 # Treat any user-errors as a device (unexpected) error 

484 if not heartbeat[0]: 

485 return (self.ERROR_CODE_DEVICE,) 

486 heartbeat = heartbeat[1] 

487 

488 return (self.ERROR_CODE_OK, { 

489 "pubKey": heartbeat["pubKey"], 

490 "message": heartbeat["message"], 

491 "tweak": heartbeat["tweak"], 

492 "signature": { 

493 "r": heartbeat["signature"].r, 

494 "s": heartbeat["signature"].s 

495 } 

496 }) 

497 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

498 self.logger.error("Dongle error in signer heartbeat: %s", str(e)) 

499 return (self.ERROR_CODE_DEVICE,) 

500 except HSM2DongleCommError: 

501 # Signal a communication problem and return a device error 

502 self._comm_issue = True 

503 self.logger.error("Dongle communication error in signer heartbeat") 

504 return (self.ERROR_CODE_DEVICE,) 

505 

506 def _ui_heartbeat(self, request): 

507 try: 

508 self.ensure_connection() 

509 

510 # Check the current mode 

511 initial_mode = self.hsm2dongle.get_current_mode() 

512 

513 # Can only gather the UI heartbeat from either the Signer or 

514 # the UI heartbeat mode itself 

515 if initial_mode not in [self.hsm2dongle.MODE.SIGNER, 

516 self.hsm2dongle.MODE.UI_HEARTBEAT]: 

517 self.logger.error("Dongle not in Signer or UI heartbeat mode when" 

518 " trying to gather UI heartbeat") 

519 return (self.ERROR_CODE_DEVICE,) 

520 

521 # Exit the signer 

522 if initial_mode == self.hsm2dongle.MODE.SIGNER: 

523 # This should raise a communication error due to USB 

524 # disconnection. Treat as successful 

525 try: 

526 self.hsm2dongle.exit_app() 

527 except HSM2DongleCommError: 

528 pass 

529 self._wait_and_reconnect() 

530 # Check we are now in UI heartbeat mode 

531 new_mode = self.hsm2dongle.get_current_mode() 

532 if new_mode != self.hsm2dongle.MODE.UI_HEARTBEAT: 

533 self.logger.error("Expected dongle to be in UI heartbeat" 

534 f" mode but got {new_mode}") 

535 return (self.ERROR_CODE_DEVICE,) 

536 

537 # Gather the heartbeat and immediately try to go back 

538 # to the signer. Deal with the heartbeat result later. 

539 heartbeat = self.hsm2dongle.get_ui_heartbeat(request["udValue"]) 

540 

541 # Exit the UI heartbeat to return to the signer 

542 if initial_mode == self.hsm2dongle.MODE.SIGNER: 

543 # This should raise a communication error due to USB 

544 # disconnection. Treat as successful 

545 try: 

546 self.hsm2dongle.exit_app() 

547 except HSM2DongleCommError: 

548 pass 

549 self._wait_and_reconnect() 

550 # Check we are now back in the Signer 

551 new_mode = self.hsm2dongle.get_current_mode() 

552 if new_mode != self.hsm2dongle.MODE.SIGNER: 

553 self.logger.error("Expected dongle to be in Signer" 

554 f" mode but got {new_mode}") 

555 return (self.ERROR_CODE_DEVICE,) 

556 

557 # Treat any user-errors as a device (unexpected) error 

558 if not heartbeat[0]: 

559 return (self.ERROR_CODE_DEVICE,) 

560 heartbeat = heartbeat[1] 

561 

562 return (self.ERROR_CODE_OK, { 

563 "pubKey": heartbeat["pubKey"], 

564 "message": heartbeat["message"], 

565 "tweak": heartbeat["tweak"], 

566 "signature": { 

567 "r": heartbeat["signature"].r, 

568 "s": heartbeat["signature"].s 

569 } 

570 }) 

571 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

572 self.logger.error("Dongle error in UI heartbeat: %s", str(e)) 

573 return (self.ERROR_CODE_DEVICE,) 

574 except HSM2DongleCommError: 

575 # Signal a communication problem and return a device error 

576 self._comm_issue = True 

577 self.logger.error("Dongle communication error in UI heartbeat") 

578 return (self.ERROR_CODE_DEVICE,)