Coverage for ledger/protocol.py: 76%

298 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-05 20:41 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import time 

24from comm.protocol import HSM2Protocol, HSM2ProtocolError, HSM2ProtocolInterrupt 

25from ledger.hsm2dongle import ( 

26 HSM2Dongle, 

27 HSM2DongleBaseError, 

28 HSM2DongleError, 

29 HSM2DongleErrorResult, 

30 HSM2DongleTimeoutError, 

31 HSM2DongleCommError, 

32 HSM2FirmwareVersion, 

33 SighashComputationMode, 

34) 

35from comm.bitcoin import get_unsigned_tx, get_tx_hash 

36 

37 

38class HSM2ProtocolLedger(HSM2Protocol): 

39 # Current manager supported versions for HSM UI and HSM SIGNER (<=) 

40 UI_VERSION = HSM2FirmwareVersion(4, 0, 2) 

41 APP_VERSION = HSM2FirmwareVersion(4, 0, 2) 

42 

43 # Amount of time to wait to make sure the app is opened 

44 OPEN_APP_WAIT = 1 # second 

45 

46 # Required minimum number of pin retries available to proceed with unlocking 

47 MIN_AVAILABLE_RETRIES = 2 

48 

49 def __init__(self, pin, dongle): 

50 super().__init__() 

51 self.hsm2dongle = dongle 

52 self._comm_issue = False 

53 self.pin = pin 

54 

55 def initialize_device(self): 

56 # Connection 

57 try: 

58 self.logger.info("Connecting to dongle") 

59 self.hsm2dongle.connect() 

60 self.logger.info("Connected to dongle") 

61 except HSM2DongleBaseError as e: 

62 self.logger.error(e) 

63 raise HSM2ProtocolError(e) 

64 

65 # Onboard check 

66 try: 

67 is_onboarded = self.hsm2dongle.is_onboarded() 

68 self.logger.info("Dongle onboarded: %s", "yes" if is_onboarded else "no") 

69 if not is_onboarded: 

70 self.logger.error("Dongle not onboarded, exiting") 

71 raise HSM2ProtocolError("Dongle not onboarded") 

72 except HSM2DongleBaseError: 

73 self.logger.info( 

74 "Could not determine onboarded status. If unlocked, " 

75 + "please enter the signing app and rerun the manager. Otherwise," 

76 + "disconnect and reconnect the ledger nano and try again" 

77 ) 

78 raise HSM2ProtocolInterrupt() 

79 

80 # Mode check 

81 self.logger.info("Finding mode") 

82 current_mode = self.hsm2dongle.get_current_mode() 

83 self.logger.debug("Mode #%s", current_mode) 

84 

85 if current_mode == HSM2Dongle.MODE.BOOTLOADER: 

86 self._handle_bootloader() 

87 

88 # After handling the bootloader, we need to reload the mode since 

89 # it should change 

90 self.logger.info("Finding mode") 

91 current_mode = self.hsm2dongle.get_current_mode() 

92 self.logger.debug("Mode #%s", current_mode) 

93 

94 # In this point, the mode should be signer. 

95 # Otherwise, we tell the user to manually enter the signer and run 

96 # the manager again 

97 if current_mode != HSM2Dongle.MODE.SIGNER: 

98 self.logger.info( 

99 "Dongle mode unknown. Please manually enter the signer " 

100 "and re-run the manager" 

101 ) 

102 raise HSM2ProtocolInterrupt() 

103 

104 self.logger.info("Mode: Signing App") 

105 

106 # Verify that the app's version is correct 

107 self._dongle_app_version = self.hsm2dongle.get_version() 

108 self._check_version(self._dongle_app_version, self.APP_VERSION, "App") 

109 

110 # Get and report signer parameters 

111 signer_parameters = self.hsm2dongle.get_signer_parameters() 

112 self.logger.info("Gathered signer parameters") 

113 self.logger.info("Checkpoint 0x%s", signer_parameters.checkpoint) 

114 self.logger.info( 

115 "Minimum required difficulty %s", 

116 hex(signer_parameters.min_required_difficulty), 

117 ) 

118 self.logger.info("Network %s", signer_parameters.network.name) 

119 

120 def report_comm_issue(self): 

121 self._comm_issue = True 

122 

123 def ensure_connection(self): 

124 if not self._comm_issue: 

125 return 

126 

127 # Attempt to reconnect 

128 self.logger.info("Attempting dongle reconnection") 

129 self.hsm2dongle.disconnect() 

130 try: 

131 self.initialize_device() 

132 self._comm_issue = False 

133 self.logger.info("Reconnection successful") 

134 except HSM2ProtocolError as e: 

135 # Capture any initialization issues 

136 # (which would include communication problems, 

137 # such as failure to connect) and bubble them 

138 # as a dongle communication error, so that 

139 # the reply to the client will be appropiate and 

140 # a subsequent reconnection will be attempted 

141 # upon the next command 

142 # (don't log anything here, initialize_device will have done so) 

143 raise HSM2DongleCommError("While attempting to reconnect: %s", str(e)) 

144 

145 # Do what needs to be done to get past the "bootloader" mode 

146 # That includes checking the bootloader (UI) version, testing echo, 

147 # unlocking the device and (potentially) changing the device PIN 

148 # Finally, exiting the bootloader which should take the user to the 

149 # signer app. 

150 def _handle_bootloader(self): 

151 self.logger.info("Mode: Bootloader") 

152 

153 # Version check 

154 self._dongle_ui_version = self.hsm2dongle.get_version() 

155 self._check_version(self._dongle_ui_version, self.UI_VERSION, "UI") 

156 

157 # Echo check 

158 self.logger.info("Sending echo") 

159 if not self.hsm2dongle.echo(): 

160 self._error("Echo error") 

161 self.logger.info("Echo OK") 

162 

163 # Get the number of retries available to unlock the device 

164 # Then, only proceed if there is more than the minimum available 

165 # retries required (otherwise we risk wiping the device) 

166 try: 

167 self.logger.info("Retrieving available pin retries") 

168 retries = self.hsm2dongle.get_retries() 

169 self.logger.info("Available pin retries: %d", retries) 

170 if retries < self.MIN_AVAILABLE_RETRIES: 

171 self.logger.error( 

172 "Available number of pin retries (%d) not enough " 

173 "to attempt a device unlock. Aborting.", 

174 retries, 

175 ) 

176 raise HSM2ProtocolInterrupt() 

177 except HSM2DongleBaseError as e: 

178 self.logger.error( 

179 "While trying to get number of pin retries: %s. Aborting.", str(e) 

180 ) 

181 raise HSM2ProtocolInterrupt() 

182 

183 # Unlock device with PIN 

184 self.logger.info("Unlocking with PIN") 

185 if not self.hsm2dongle.unlock(self.pin.get_pin()): 

186 self._error("Unable to unlock: PIN mismatch") 

187 self.logger.info("PIN accepted") 

188 

189 # First PIN use check 

190 if self.pin.needs_change(): 

191 try: 

192 self.logger.info("PIN change need detected. Generating and changing PIN") 

193 self.pin.start_change() 

194 self.logger.info("Sending new PIN") 

195 if not self.hsm2dongle.new_pin(self.pin.get_new_pin()): 

196 raise Exception("Dongle reported fail to change pin. Pin invalid?") 

197 self.pin.commit_change() 

198 self.logger.info( 

199 "PIN changed. Please disconnect and reconnect the ledger nano" 

200 ) 

201 except Exception as e: 

202 self.pin.abort_change() 

203 self.logger.error( 

204 "Error changing PIN: %s. Please disconnect and " 

205 "reconnect the ledger nano and try again", 

206 format(e), 

207 ) 

208 finally: 

209 raise HSM2ProtocolInterrupt() 

210 

211 # This loads the app if in bootloader mode 

212 # This usually fails with a timeout, but its fine cause 

213 # what happens is that when opening the app, 

214 # the ledger disconnects from usb and reconnects 

215 self.logger.info("Loading Signing app") 

216 try: 

217 self.hsm2dongle.exit_menu() 

218 except Exception: 

219 # exit_menu() always throws due to USB disconnection. we don't care 

220 pass 

221 

222 self._wait_and_reconnect() 

223 

224 def _wait_and_reconnect(self): 

225 # Wait a little bit to make sure the app is loaded 

226 # and then reconnect to the dongle 

227 time.sleep(self.OPEN_APP_WAIT) 

228 self.hsm2dongle.disconnect() 

229 self.hsm2dongle.connect() 

230 

231 def _check_version(self, fware_version, mware_version, name): 

232 self.logger.info( 

233 "%s version: %s (supported <= %s)", name, fware_version, mware_version 

234 ) 

235 if not mware_version.supports(fware_version): 

236 self._error( 

237 "Unsupported %s version: Dongle reports %s, Node needs <= %s" 

238 % (name, fware_version, mware_version) 

239 ) 

240 

241 def _error(self, msg): 

242 self.logger.error(msg) 

243 raise HSM2ProtocolError(msg) 

244 

245 def _get_pubkey(self, request): 

246 try: 

247 self.ensure_connection() 

248 return ( 

249 self.ERROR_CODE_OK, 

250 {"pubKey": self.hsm2dongle.get_public_key(request["keyId"])}, 

251 ) 

252 except HSM2DongleErrorResult: 

253 return (self.ERROR_CODE_INVALID_KEYID,) 

254 except HSM2DongleTimeoutError: 

255 self.logger.error("Dongle timeout getting public key") 

256 return (self.ERROR_CODE_DEVICE,) 

257 except HSM2DongleCommError: 

258 # Signal a communication problem and return a device error 

259 self._comm_issue = True 

260 self.logger.error("Dongle communication error getting public key") 

261 return (self.ERROR_CODE_DEVICE,) 

262 except HSM2DongleError as e: 

263 return self._error("Dongle error in get_pubkey: %s" % str(e)) 

264 

265 def _sign(self, request): 

266 # First validate the required fields are OK 

267 if "message" in request and "hash" in request["message"]: 

268 # Unauthorized signing 

269 message_validation = self._validate_message(request, what="hash") 

270 if message_validation < self.ERROR_CODE_OK: 

271 return (message_validation,) 

272 

273 try: 

274 self.ensure_connection() 

275 sign_result = self.hsm2dongle.sign_unauthorized( 

276 key_id=request["keyId"], hash=request["message"]["hash"] 

277 ) 

278 except HSM2DongleTimeoutError: 

279 self.logger.error("Dongle timeout signing") 

280 return (self.ERROR_CODE_DEVICE,) 

281 except HSM2DongleCommError: 

282 # Signal a communication problem and return a device error 

283 self._comm_issue = True 

284 self.logger.error("Dongle communication error signing") 

285 return (self.ERROR_CODE_DEVICE,) 

286 except HSM2DongleError as e: 

287 self._error("Dongle error in sign: %s" % str(e)) 

288 else: 

289 # Authorized signing 

290 auth_validation = self._validate_auth(request, mandatory=True) 

291 if auth_validation < self.ERROR_CODE_OK: 

292 return (auth_validation,) 

293 

294 message_validation = self._validate_message(request, what="tx") 

295 if message_validation < self.ERROR_CODE_OK: 

296 return (message_validation,) 

297 

298 # Shorthand 

299 msg = request["message"] 

300 

301 # Make sure the transaction 

302 # is fully unsigned before sending. 

303 try: 

304 unsigned_btc_tx = get_unsigned_tx(msg["tx"]) 

305 self.logger.debug("Unsigned BTC tx: %s", get_tx_hash(unsigned_btc_tx)) 

306 except Exception as e: 

307 self.logger.error("Error unsigning BTC tx: %s", str(e)) 

308 return (self.ERROR_CODE_INVALID_MESSAGE,) 

309 

310 try: 

311 self.ensure_connection() 

312 sign_result = self.hsm2dongle.sign_authorized( 

313 key_id=request["keyId"], 

314 rsk_tx_receipt=request["auth"]["receipt"], 

315 receipt_merkle_proof=request["auth"]["receipt_merkle_proof"], 

316 btc_tx=unsigned_btc_tx, 

317 input_index=msg["input"], 

318 sighash_computation_mode=SighashComputationMode( 

319 msg["sighashComputationMode"]), 

320 witness_script=msg.get("witnessScript"), 

321 outpoint_value=msg.get("outpointValue") 

322 ) 

323 except HSM2DongleTimeoutError: 

324 self.logger.error("Dongle timeout signing") 

325 return (self.ERROR_CODE_DEVICE,) 

326 except HSM2DongleCommError: 

327 # Signal a communication problem and return a device error 

328 self._comm_issue = True 

329 self.logger.error("Dongle communication error signing") 

330 return (self.ERROR_CODE_DEVICE,) 

331 except HSM2DongleError as e: 

332 self._error("Dongle error in sign: %s" % str(e)) 

333 

334 # Signing result is the same for both authorized and non authorized signing 

335 if not sign_result[0]: 

336 return (self._translate_sign_error(sign_result[1]),) 

337 signature = sign_result[1] 

338 

339 return (self.ERROR_CODE_OK, {"signature": {"r": signature.r, "s": signature.s}}) 

340 

341 def _translate_sign_error(self, error_code): 

342 return ( 

343 { 

344 HSM2Dongle.RESPONSE.SIGN.ERROR_PATH: self.ERROR_CODE_INVALID_KEYID, 

345 HSM2Dongle.RESPONSE.SIGN.ERROR_BTC_TX: self.ERROR_CODE_INVALID_MESSAGE, 

346 HSM2Dongle.RESPONSE.SIGN.ERROR_TX_RECEIPT: self.ERROR_CODE_INVALID_AUTH, 

347 HSM2Dongle.RESPONSE.SIGN.ERROR_MERKLE_PROOF: self.ERROR_CODE_INVALID_AUTH, 

348 HSM2Dongle.RESPONSE.SIGN.ERROR_HASH: self.ERROR_CODE_INVALID_MESSAGE, 

349 HSM2Dongle.RESPONSE.SIGN.ERROR_UNEXPECTED: self.ERROR_CODE_DEVICE, 

350 } 

351 ).get(error_code, self.ERROR_CODE_UNKNOWN) 

352 

353 def _blockchain_state(self, request): 

354 try: 

355 self.ensure_connection() 

356 state = self.hsm2dongle.get_blockchain_state() 

357 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

358 self.logger.error("Dongle error getting blockchain state: %s", str(e)) 

359 return (self.ERROR_CODE_DEVICE,) 

360 except HSM2DongleCommError: 

361 # Signal a communication problem and return a device error 

362 self._comm_issue = True 

363 self.logger.error("Dongle communication error getting blockchain state") 

364 return (self.ERROR_CODE_DEVICE,) 

365 

366 state_result = { 

367 "best_block": state["best_block"], 

368 "newest_valid_block": state["newest_valid_block"], 

369 "ancestor_block": state["ancestor_block"], 

370 "ancestor_receipts_root": state["ancestor_receipts_root"], 

371 "updating": { 

372 "best_block": state["updating.best_block"], 

373 "newest_valid_block": state["updating.newest_valid_block"], 

374 "next_expected_block": state["updating.next_expected_block"], 

375 "total_difficulty": state["updating.total_difficulty"], 

376 "in_progress": state["updating.in_progress"], 

377 "already_validated": state["updating.already_validated"], 

378 "found_best_block": state["updating.found_best_block"], 

379 }, 

380 } 

381 

382 return (self.ERROR_CODE_OK, {"state": state_result}) 

383 

384 def _reset_advance_blockchain(self, request): 

385 try: 

386 self.ensure_connection() 

387 self.hsm2dongle.reset_advance_blockchain() 

388 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

389 self.logger.error("Dongle error resetting advance blockchain: %s", str(e)) 

390 return (self.ERROR_CODE_DEVICE,) 

391 except HSM2DongleCommError: 

392 # Signal a communication problem and return a device error 

393 self._comm_issue = True 

394 self.logger.error("Dongle communication error resetting advance blockchain") 

395 return (self.ERROR_CODE_DEVICE,) 

396 

397 return (self.ERROR_CODE_OK, {}) 

398 

399 def _advance_blockchain(self, request): 

400 try: 

401 self.ensure_connection() 

402 advance_result = self.hsm2dongle.advance_blockchain( 

403 request["blocks"], request["brothers"] 

404 ) 

405 return (self._translate_advance_result(advance_result[1]), {}) 

406 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

407 self.logger.error("Dongle error in advance blockchain: %s", str(e)) 

408 return (self.ERROR_CODE_DEVICE,) 

409 except HSM2DongleCommError: 

410 # Signal a communication problem and return a device error 

411 self._comm_issue = True 

412 self.logger.error("Dongle communication error in advance blockchain") 

413 return (self.ERROR_CODE_DEVICE,) 

414 

415 def _translate_advance_result(self, result): 

416 DERR = HSM2Dongle.RESPONSE.ADVANCE 

417 return ({ 

418 DERR.OK_TOTAL: self.ERROR_CODE_OK, 

419 DERR.OK_PARTIAL: self.ERROR_CODE_OK_PARTIAL, 

420 DERR.ERROR_INIT: self.ERROR_CODE_DEVICE, 

421 DERR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

422 DERR.ERROR_METADATA: self.ERROR_CODE_DEVICE, 

423 DERR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE, 

424 DERR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

425 DERR.ERROR_POW_INVALID: self.ERROR_CODE_POW_INVALID, 

426 DERR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501 

427 DERR.ERROR_UNSUPPORTED_CHAIN: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

428 DERR.ERROR_INVALID_BROTHERS: self.ERROR_CODE_INVALID_BROTHERS, # noqa E501 

429 DERR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN, 

430 }).get(result, self.ERROR_CODE_UNKNOWN) 

431 

432 def _update_ancestor_block(self, request): 

433 try: 

434 self.ensure_connection() 

435 update_result = self.hsm2dongle.update_ancestor(request["blocks"]) 

436 return (self._translate_update_ancestor_result(update_result[1]), {}) 

437 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

438 self.logger.error("Dongle error in update ancestor: %s", str(e)) 

439 return (self.ERROR_CODE_DEVICE,) 

440 except HSM2DongleCommError: 

441 # Signal a communication problem and return a device error 

442 self._comm_issue = True 

443 self.logger.error("Dongle communication error in update ancestor") 

444 return (self.ERROR_CODE_DEVICE,) 

445 

446 def _translate_update_ancestor_result(self, result): 

447 return ({ 

448 HSM2Dongle.RESPONSE.UPD_ANCESTOR.OK_TOTAL: self.ERROR_CODE_OK, 

449 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INIT: self.ERROR_CODE_DEVICE, 

450 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

451 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_METADATA: self.ERROR_CODE_DEVICE, 

452 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE, 

453 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

454 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501 

455 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_TIP_MISMATCH: self.ERROR_CODE_TIP_MISMATCH, # noqa E501 

456 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_REMOVE_MM_FIELDS: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501 

457 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN, 

458 }).get(result, self.ERROR_CODE_UNKNOWN) 

459 

460 def _get_blockchain_parameters(self, request): 

461 try: 

462 self.ensure_connection() 

463 params = self.hsm2dongle.get_signer_parameters() 

464 return (self.ERROR_CODE_OK, {"parameters": { 

465 "checkpoint": params.checkpoint, 

466 "minimum_difficulty": params.min_required_difficulty, 

467 "network": params.network.name.lower()} 

468 }) 

469 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

470 self.logger.error("Dongle error in get parameters: %s", str(e)) 

471 return (self.ERROR_CODE_DEVICE,) 

472 except HSM2DongleCommError: 

473 # Signal a communication problem and return a device error 

474 self._comm_issue = True 

475 self.logger.error("Dongle communication error in get parameters") 

476 return (self.ERROR_CODE_DEVICE,) 

477 

478 def _signer_heartbeat(self, request): 

479 try: 

480 self.ensure_connection() 

481 

482 heartbeat = self.hsm2dongle.get_signer_heartbeat(request["udValue"]) 

483 # Treat any user-errors as a device (unexpected) error 

484 if not(heartbeat[0]): 

485 return (self.ERROR_CODE_DEVICE,) 

486 heartbeat = heartbeat[1] 

487 

488 return (self.ERROR_CODE_OK, { 

489 "pubKey": heartbeat["pubKey"], 

490 "message": heartbeat["message"], 

491 "tweak": heartbeat["tweak"], 

492 "signature": { 

493 "r": heartbeat["signature"].r, 

494 "s": heartbeat["signature"].s 

495 } 

496 }) 

497 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

498 self.logger.error("Dongle error in signer heartbeat: %s", str(e)) 

499 return (self.ERROR_CODE_DEVICE,) 

500 except HSM2DongleCommError: 

501 # Signal a communication problem and return a device error 

502 self._comm_issue = True 

503 self.logger.error("Dongle communication error in signer heartbeat") 

504 return (self.ERROR_CODE_DEVICE,) 

505 

506 def _ui_heartbeat(self, request): 

507 try: 

508 self.ensure_connection() 

509 

510 # Check the current mode 

511 initial_mode = self.hsm2dongle.get_current_mode() 

512 

513 # Can only gather the UI heartbeat from either the Signer or 

514 # the UI heartbeat mode itself 

515 if not(initial_mode in [self.hsm2dongle.MODE.SIGNER, 

516 self.hsm2dongle.MODE.UI_HEARTBEAT]): 

517 self.logger.error("Dongle not in Signer or UI heartbeat mode when" 

518 " trying to gather UI heartbeat") 

519 return (self.ERROR_CODE_DEVICE,) 

520 

521 # Exit the signer 

522 if initial_mode == self.hsm2dongle.MODE.SIGNER: 

523 # This should raise a communication error due to USB 

524 # disconnection. Treat as successful 

525 try: 

526 self.hsm2dongle.exit_app() 

527 except HSM2DongleCommError: 

528 pass 

529 self._wait_and_reconnect() 

530 # Check we are now in UI heartbeat mode 

531 new_mode = self.hsm2dongle.get_current_mode() 

532 if new_mode != self.hsm2dongle.MODE.UI_HEARTBEAT: 

533 self.logger.error("Expected dongle to be in UI heartbeat" 

534 f" mode but got {new_mode}") 

535 return (self.ERROR_CODE_DEVICE,) 

536 

537 # Gather the heartbeat and immediately try to go back 

538 # to the signer. Deal with the heartbeat result later. 

539 heartbeat = self.hsm2dongle.get_ui_heartbeat(request["udValue"]) 

540 

541 # Exit the UI heartbeat to return to the signer 

542 if initial_mode == self.hsm2dongle.MODE.SIGNER: 

543 # This should raise a communication error due to USB 

544 # disconnection. Treat as successful 

545 try: 

546 self.hsm2dongle.exit_app() 

547 except HSM2DongleCommError: 

548 pass 

549 self._wait_and_reconnect() 

550 # Check we are now back in the Signer 

551 new_mode = self.hsm2dongle.get_current_mode() 

552 if new_mode != self.hsm2dongle.MODE.SIGNER: 

553 self.logger.error("Expected dongle to be in Signer" 

554 f" mode but got {new_mode}") 

555 return (self.ERROR_CODE_DEVICE,) 

556 

557 # Treat any user-errors as a device (unexpected) error 

558 if not(heartbeat[0]): 

559 return (self.ERROR_CODE_DEVICE,) 

560 heartbeat = heartbeat[1] 

561 

562 return (self.ERROR_CODE_OK, { 

563 "pubKey": heartbeat["pubKey"], 

564 "message": heartbeat["message"], 

565 "tweak": heartbeat["tweak"], 

566 "signature": { 

567 "r": heartbeat["signature"].r, 

568 "s": heartbeat["signature"].s 

569 } 

570 }) 

571 except (HSM2DongleError, HSM2DongleTimeoutError) as e: 

572 self.logger.error("Dongle error in UI heartbeat: %s", str(e)) 

573 return (self.ERROR_CODE_DEVICE,) 

574 except HSM2DongleCommError: 

575 # Signal a communication problem and return a device error 

576 self._comm_issue = True 

577 self.logger.error("Dongle communication error in UI heartbeat") 

578 return (self.ERROR_CODE_DEVICE,)