Coverage for ledger/protocol.py: 76%
299 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import time
24from comm.protocol import HSM2Protocol, HSM2ProtocolError, HSM2ProtocolInterrupt
25from comm.platform import Platform
26from ledger.hsm2dongle import (
27 HSM2Dongle,
28 HSM2DongleBaseError,
29 HSM2DongleError,
30 HSM2DongleErrorResult,
31 HSM2DongleTimeoutError,
32 HSM2DongleCommError,
33 HSM2FirmwareVersion,
34 SighashComputationMode,
35)
36from comm.bitcoin import get_unsigned_tx, get_tx_hash
39class HSM2ProtocolLedger(HSM2Protocol):
40 # Current manager supported versions for HSM UI and HSM SIGNER (<=)
41 UI_VERSION = HSM2FirmwareVersion(5, 5, 1)
42 APP_VERSION = HSM2FirmwareVersion(5, 5, 1)
44 # Amount of time to wait to make sure the app is opened
45 OPEN_APP_WAIT = 1 # second
47 # Required minimum number of pin retries available to proceed with unlocking
48 MIN_AVAILABLE_RETRIES = 2
50 def __init__(self, pin, dongle):
51 super().__init__()
52 self.hsm2dongle = dongle
53 self._comm_issue = False
54 self.pin = pin
56 def initialize_device(self):
57 # Connection
58 try:
59 self.logger.info("Connecting to dongle")
60 self.hsm2dongle.connect()
61 self.logger.info("Connected to dongle")
62 except HSM2DongleBaseError as e:
63 self.logger.error(e)
64 raise HSM2ProtocolError(e)
66 # Onboard check
67 try:
68 is_onboarded = self.hsm2dongle.is_onboarded()
69 self.logger.info("Dongle onboarded: %s", "yes" if is_onboarded else "no")
70 if not is_onboarded:
71 self.logger.error("Dongle not onboarded, exiting")
72 raise HSM2ProtocolError("Dongle not onboarded")
73 except HSM2DongleBaseError:
74 self.logger.info(
75 "Could not determine onboarded status. If unlocked, "
76 + "please enter the signing app and rerun the manager. Otherwise,"
77 + f"{Platform.message('restart')} and try again"
78 )
79 raise HSM2ProtocolInterrupt()
81 # Mode check
82 self.logger.info("Finding mode")
83 current_mode = self.hsm2dongle.get_current_mode()
84 self.logger.debug("Mode #%s", current_mode)
86 if current_mode == HSM2Dongle.MODE.BOOTLOADER:
87 self._handle_bootloader()
89 # After handling the bootloader, we need to reload the mode since
90 # it should change
91 self.logger.info("Finding mode")
92 current_mode = self.hsm2dongle.get_current_mode()
93 self.logger.debug("Mode #%s", current_mode)
95 # In this point, the mode should be signer.
96 # Otherwise, we tell the user to manually enter the signer and run
97 # the manager again
98 if current_mode != HSM2Dongle.MODE.SIGNER:
99 self.logger.info(
100 "Dongle mode unknown. Please manually enter the signer "
101 "and re-run the manager"
102 )
103 raise HSM2ProtocolInterrupt()
105 self.logger.info("Mode: Signing App")
107 # Verify that the app's version is correct
108 self._dongle_app_version = self.hsm2dongle.get_version()
109 self._check_version(self._dongle_app_version, self.APP_VERSION, "App")
111 # Get and report signer parameters
112 signer_parameters = self.hsm2dongle.get_signer_parameters()
113 self.logger.info("Gathered signer parameters")
114 self.logger.info("Checkpoint 0x%s", signer_parameters.checkpoint)
115 self.logger.info(
116 "Minimum required difficulty %s",
117 hex(signer_parameters.min_required_difficulty),
118 )
119 self.logger.info("Network %s", signer_parameters.network.name)
121 def report_comm_issue(self):
122 self._comm_issue = True
124 def ensure_connection(self):
125 if not self._comm_issue:
126 return
128 # Attempt to reconnect
129 self.logger.info("Attempting dongle reconnection")
130 self.hsm2dongle.disconnect()
131 try:
132 self.initialize_device()
133 self._comm_issue = False
134 self.logger.info("Reconnection successful")
135 except HSM2ProtocolError as e:
136 # Capture any initialization issues
137 # (which would include communication problems,
138 # such as failure to connect) and bubble them
139 # as a dongle communication error, so that
140 # the reply to the client will be appropiate and
141 # a subsequent reconnection will be attempted
142 # upon the next command
143 # (don't log anything here, initialize_device will have done so)
144 raise HSM2DongleCommError("While attempting to reconnect: %s", str(e))
146 # Do what needs to be done to get past the "bootloader" mode
147 # That includes checking the bootloader (UI) version, testing echo,
148 # unlocking the device and (potentially) changing the device PIN
149 # Finally, exiting the bootloader which should take the user to the
150 # signer app.
151 def _handle_bootloader(self):
152 self.logger.info("Mode: Bootloader")
154 # Version check
155 self._dongle_ui_version = self.hsm2dongle.get_version()
156 self._check_version(self._dongle_ui_version, self.UI_VERSION, "UI")
158 # Echo check
159 self.logger.info("Sending echo")
160 if not self.hsm2dongle.echo():
161 self._error("Echo error")
162 self.logger.info("Echo OK")
164 # Get the number of retries available to unlock the device
165 # Then, only proceed if there is more than the minimum available
166 # retries required (otherwise we risk wiping the device)
167 try:
168 self.logger.info("Retrieving available pin retries")
169 retries = self.hsm2dongle.get_retries()
170 self.logger.info("Available pin retries: %d", retries)
171 if retries < self.MIN_AVAILABLE_RETRIES:
172 self.logger.error(
173 "Available number of pin retries (%d) not enough "
174 "to attempt a device unlock. Aborting.",
175 retries,
176 )
177 raise HSM2ProtocolInterrupt()
178 except HSM2DongleBaseError as e:
179 self.logger.error(
180 "While trying to get number of pin retries: %s. Aborting.", str(e)
181 )
182 raise HSM2ProtocolInterrupt()
184 # Unlock device with PIN
185 self.logger.info("Unlocking with PIN")
186 if not self.hsm2dongle.unlock(self.pin.get_pin()):
187 self._error("Unable to unlock: PIN mismatch")
188 self.logger.info("PIN accepted")
190 # First PIN use check
191 if self.pin.needs_change():
192 try:
193 self.logger.info("PIN change need detected. Generating and changing PIN")
194 self.pin.start_change()
195 self.logger.info("Sending new PIN")
196 if not self.hsm2dongle.new_pin(self.pin.get_new_pin()):
197 raise Exception("Dongle reported fail to change pin. Pin invalid?")
198 self.pin.commit_change()
199 self.logger.info(
200 f"PIN changed. Please {Platform.message('restart')}"
201 )
202 except Exception as e:
203 self.pin.abort_change()
204 self.logger.error(
205 f"Error changing PIN: %s. Please {Platform.message('restart')} "
206 "and try again", format(e),
207 )
208 finally:
209 raise HSM2ProtocolInterrupt()
211 # This loads the app if in bootloader mode
212 # This usually fails with a timeout, but its fine cause
213 # what happens is that when opening the app,
214 # the ledger disconnects from usb and reconnects
215 self.logger.info("Loading Signing app")
216 try:
217 self.hsm2dongle.exit_menu()
218 except Exception:
219 # exit_menu() always throws due to USB disconnection. we don't care
220 pass
222 self._wait_and_reconnect()
224 def _wait_and_reconnect(self):
225 # Wait a little bit to make sure the app is loaded
226 # and then reconnect to the dongle
227 time.sleep(self.OPEN_APP_WAIT)
228 self.hsm2dongle.disconnect()
229 self.hsm2dongle.connect()
231 def _check_version(self, fware_version, mware_version, name):
232 self.logger.info(
233 "%s version: %s (supported <= %s)", name, fware_version, mware_version
234 )
235 if not mware_version.supports(fware_version):
236 self._error(
237 "Unsupported %s version: Dongle reports %s, Node needs <= %s"
238 % (name, fware_version, mware_version)
239 )
241 def _error(self, msg):
242 self.logger.error(msg)
243 raise HSM2ProtocolError(msg)
245 def _get_pubkey(self, request):
246 try:
247 self.ensure_connection()
248 return (
249 self.ERROR_CODE_OK,
250 {"pubKey": self.hsm2dongle.get_public_key(request["keyId"])},
251 )
252 except HSM2DongleErrorResult:
253 return (self.ERROR_CODE_INVALID_KEYID,)
254 except HSM2DongleTimeoutError:
255 self.logger.error("Dongle timeout getting public key")
256 return (self.ERROR_CODE_DEVICE,)
257 except HSM2DongleCommError:
258 # Signal a communication problem and return a device error
259 self._comm_issue = True
260 self.logger.error("Dongle communication error getting public key")
261 return (self.ERROR_CODE_DEVICE,)
262 except HSM2DongleError as e:
263 return self._error("Dongle error in get_pubkey: %s" % str(e))
265 def _sign(self, request):
266 # First validate the required fields are OK
267 if "message" in request and "hash" in request["message"]:
268 # Unauthorized signing
269 message_validation = self._validate_message(request, what="hash")
270 if message_validation < self.ERROR_CODE_OK:
271 return (message_validation,)
273 try:
274 self.ensure_connection()
275 sign_result = self.hsm2dongle.sign_unauthorized(
276 key_id=request["keyId"], hash=request["message"]["hash"]
277 )
278 except HSM2DongleTimeoutError:
279 self.logger.error("Dongle timeout signing")
280 return (self.ERROR_CODE_DEVICE,)
281 except HSM2DongleCommError:
282 # Signal a communication problem and return a device error
283 self._comm_issue = True
284 self.logger.error("Dongle communication error signing")
285 return (self.ERROR_CODE_DEVICE,)
286 except HSM2DongleError as e:
287 self._error("Dongle error in sign: %s" % str(e))
288 else:
289 # Authorized signing
290 auth_validation = self._validate_auth(request, mandatory=True)
291 if auth_validation < self.ERROR_CODE_OK:
292 return (auth_validation,)
294 message_validation = self._validate_message(request, what="tx")
295 if message_validation < self.ERROR_CODE_OK:
296 return (message_validation,)
298 # Shorthand
299 msg = request["message"]
301 # Make sure the transaction
302 # is fully unsigned before sending.
303 try:
304 unsigned_btc_tx = get_unsigned_tx(msg["tx"])
305 self.logger.debug("Unsigned BTC tx: %s", get_tx_hash(unsigned_btc_tx))
306 except Exception as e:
307 self.logger.error("Error unsigning BTC tx: %s", str(e))
308 return (self.ERROR_CODE_INVALID_MESSAGE,)
310 try:
311 self.ensure_connection()
312 sign_result = self.hsm2dongle.sign_authorized(
313 key_id=request["keyId"],
314 rsk_tx_receipt=request["auth"]["receipt"],
315 receipt_merkle_proof=request["auth"]["receipt_merkle_proof"],
316 btc_tx=unsigned_btc_tx,
317 input_index=msg["input"],
318 sighash_computation_mode=SighashComputationMode(
319 msg["sighashComputationMode"]),
320 witness_script=msg.get("witnessScript"),
321 outpoint_value=msg.get("outpointValue")
322 )
323 except HSM2DongleTimeoutError:
324 self.logger.error("Dongle timeout signing")
325 return (self.ERROR_CODE_DEVICE,)
326 except HSM2DongleCommError:
327 # Signal a communication problem and return a device error
328 self._comm_issue = True
329 self.logger.error("Dongle communication error signing")
330 return (self.ERROR_CODE_DEVICE,)
331 except HSM2DongleError as e:
332 self._error("Dongle error in sign: %s" % str(e))
334 # Signing result is the same for both authorized and non authorized signing
335 if not sign_result[0]:
336 return (self._translate_sign_error(sign_result[1]),)
337 signature = sign_result[1]
339 return (self.ERROR_CODE_OK, {"signature": {"r": signature.r, "s": signature.s}})
341 def _translate_sign_error(self, error_code):
342 return (
343 {
344 HSM2Dongle.RESPONSE.SIGN.ERROR_PATH: self.ERROR_CODE_INVALID_KEYID,
345 HSM2Dongle.RESPONSE.SIGN.ERROR_BTC_TX: self.ERROR_CODE_INVALID_MESSAGE,
346 HSM2Dongle.RESPONSE.SIGN.ERROR_TX_RECEIPT: self.ERROR_CODE_INVALID_AUTH,
347 HSM2Dongle.RESPONSE.SIGN.ERROR_MERKLE_PROOF: self.ERROR_CODE_INVALID_AUTH,
348 HSM2Dongle.RESPONSE.SIGN.ERROR_HASH: self.ERROR_CODE_INVALID_MESSAGE,
349 HSM2Dongle.RESPONSE.SIGN.ERROR_UNEXPECTED: self.ERROR_CODE_DEVICE,
350 }
351 ).get(error_code, self.ERROR_CODE_UNKNOWN)
353 def _blockchain_state(self, request):
354 try:
355 self.ensure_connection()
356 state = self.hsm2dongle.get_blockchain_state()
357 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
358 self.logger.error("Dongle error getting blockchain state: %s", str(e))
359 return (self.ERROR_CODE_DEVICE,)
360 except HSM2DongleCommError:
361 # Signal a communication problem and return a device error
362 self._comm_issue = True
363 self.logger.error("Dongle communication error getting blockchain state")
364 return (self.ERROR_CODE_DEVICE,)
366 state_result = {
367 "best_block": state["best_block"],
368 "newest_valid_block": state["newest_valid_block"],
369 "ancestor_block": state["ancestor_block"],
370 "ancestor_receipts_root": state["ancestor_receipts_root"],
371 "updating": {
372 "best_block": state["updating.best_block"],
373 "newest_valid_block": state["updating.newest_valid_block"],
374 "next_expected_block": state["updating.next_expected_block"],
375 "total_difficulty": state["updating.total_difficulty"],
376 "in_progress": state["updating.in_progress"],
377 "already_validated": state["updating.already_validated"],
378 "found_best_block": state["updating.found_best_block"],
379 },
380 }
382 return (self.ERROR_CODE_OK, {"state": state_result})
384 def _reset_advance_blockchain(self, request):
385 try:
386 self.ensure_connection()
387 self.hsm2dongle.reset_advance_blockchain()
388 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
389 self.logger.error("Dongle error resetting advance blockchain: %s", str(e))
390 return (self.ERROR_CODE_DEVICE,)
391 except HSM2DongleCommError:
392 # Signal a communication problem and return a device error
393 self._comm_issue = True
394 self.logger.error("Dongle communication error resetting advance blockchain")
395 return (self.ERROR_CODE_DEVICE,)
397 return (self.ERROR_CODE_OK, {})
399 def _advance_blockchain(self, request):
400 try:
401 self.ensure_connection()
402 advance_result = self.hsm2dongle.advance_blockchain(
403 request["blocks"], request["brothers"]
404 )
405 return (self._translate_advance_result(advance_result[1]), {})
406 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
407 self.logger.error("Dongle error in advance blockchain: %s", str(e))
408 return (self.ERROR_CODE_DEVICE,)
409 except HSM2DongleCommError:
410 # Signal a communication problem and return a device error
411 self._comm_issue = True
412 self.logger.error("Dongle communication error in advance blockchain")
413 return (self.ERROR_CODE_DEVICE,)
415 def _translate_advance_result(self, result):
416 DERR = HSM2Dongle.RESPONSE.ADVANCE
417 return ({
418 DERR.OK_TOTAL: self.ERROR_CODE_OK,
419 DERR.OK_PARTIAL: self.ERROR_CODE_OK_PARTIAL,
420 DERR.ERROR_INIT: self.ERROR_CODE_DEVICE,
421 DERR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
422 DERR.ERROR_METADATA: self.ERROR_CODE_DEVICE,
423 DERR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE,
424 DERR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
425 DERR.ERROR_POW_INVALID: self.ERROR_CODE_POW_INVALID,
426 DERR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501
427 DERR.ERROR_UNSUPPORTED_CHAIN: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
428 DERR.ERROR_INVALID_BROTHERS: self.ERROR_CODE_INVALID_BROTHERS, # noqa E501
429 DERR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN,
430 }).get(result, self.ERROR_CODE_UNKNOWN)
432 def _update_ancestor_block(self, request):
433 try:
434 self.ensure_connection()
435 update_result = self.hsm2dongle.update_ancestor(request["blocks"])
436 return (self._translate_update_ancestor_result(update_result[1]), {})
437 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
438 self.logger.error("Dongle error in update ancestor: %s", str(e))
439 return (self.ERROR_CODE_DEVICE,)
440 except HSM2DongleCommError:
441 # Signal a communication problem and return a device error
442 self._comm_issue = True
443 self.logger.error("Dongle communication error in update ancestor")
444 return (self.ERROR_CODE_DEVICE,)
446 def _translate_update_ancestor_result(self, result):
447 return ({
448 HSM2Dongle.RESPONSE.UPD_ANCESTOR.OK_TOTAL: self.ERROR_CODE_OK,
449 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INIT: self.ERROR_CODE_DEVICE,
450 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_COMPUTE_METADATA: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
451 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_METADATA: self.ERROR_CODE_DEVICE,
452 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_BLOCK_DATA: self.ERROR_CODE_DEVICE,
453 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_INVALID_BLOCK: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
454 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_CHAINING_MISMATCH: self.ERROR_CODE_CHAINING_MISMATCH, # noqa E501
455 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_TIP_MISMATCH: self.ERROR_CODE_TIP_MISMATCH, # noqa E501
456 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_REMOVE_MM_FIELDS: self.ERROR_CODE_INVALID_INPUT_BLOCKS, # noqa E501
457 HSM2Dongle.RESPONSE.UPD_ANCESTOR.ERROR_UNEXPECTED: self.ERROR_CODE_UNKNOWN,
458 }).get(result, self.ERROR_CODE_UNKNOWN)
460 def _get_blockchain_parameters(self, request):
461 try:
462 self.ensure_connection()
463 params = self.hsm2dongle.get_signer_parameters()
464 return (self.ERROR_CODE_OK, {"parameters": {
465 "checkpoint": params.checkpoint,
466 "minimum_difficulty": params.min_required_difficulty,
467 "network": params.network.name.lower()}
468 })
469 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
470 self.logger.error("Dongle error in get parameters: %s", str(e))
471 return (self.ERROR_CODE_DEVICE,)
472 except HSM2DongleCommError:
473 # Signal a communication problem and return a device error
474 self._comm_issue = True
475 self.logger.error("Dongle communication error in get parameters")
476 return (self.ERROR_CODE_DEVICE,)
478 def _signer_heartbeat(self, request):
479 try:
480 self.ensure_connection()
482 heartbeat = self.hsm2dongle.get_signer_heartbeat(request["udValue"])
483 # Treat any user-errors as a device (unexpected) error
484 if not heartbeat[0]:
485 return (self.ERROR_CODE_DEVICE,)
486 heartbeat = heartbeat[1]
488 return (self.ERROR_CODE_OK, {
489 "pubKey": heartbeat["pubKey"],
490 "message": heartbeat["message"],
491 "tweak": heartbeat["tweak"],
492 "signature": {
493 "r": heartbeat["signature"].r,
494 "s": heartbeat["signature"].s
495 }
496 })
497 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
498 self.logger.error("Dongle error in signer heartbeat: %s", str(e))
499 return (self.ERROR_CODE_DEVICE,)
500 except HSM2DongleCommError:
501 # Signal a communication problem and return a device error
502 self._comm_issue = True
503 self.logger.error("Dongle communication error in signer heartbeat")
504 return (self.ERROR_CODE_DEVICE,)
506 def _ui_heartbeat(self, request):
507 try:
508 self.ensure_connection()
510 # Check the current mode
511 initial_mode = self.hsm2dongle.get_current_mode()
513 # Can only gather the UI heartbeat from either the Signer or
514 # the UI heartbeat mode itself
515 if initial_mode not in [self.hsm2dongle.MODE.SIGNER,
516 self.hsm2dongle.MODE.UI_HEARTBEAT]:
517 self.logger.error("Dongle not in Signer or UI heartbeat mode when"
518 " trying to gather UI heartbeat")
519 return (self.ERROR_CODE_DEVICE,)
521 # Exit the signer
522 if initial_mode == self.hsm2dongle.MODE.SIGNER:
523 # This should raise a communication error due to USB
524 # disconnection. Treat as successful
525 try:
526 self.hsm2dongle.exit_app()
527 except HSM2DongleCommError:
528 pass
529 self._wait_and_reconnect()
530 # Check we are now in UI heartbeat mode
531 new_mode = self.hsm2dongle.get_current_mode()
532 if new_mode != self.hsm2dongle.MODE.UI_HEARTBEAT:
533 self.logger.error("Expected dongle to be in UI heartbeat"
534 f" mode but got {new_mode}")
535 return (self.ERROR_CODE_DEVICE,)
537 # Gather the heartbeat and immediately try to go back
538 # to the signer. Deal with the heartbeat result later.
539 heartbeat = self.hsm2dongle.get_ui_heartbeat(request["udValue"])
541 # Exit the UI heartbeat to return to the signer
542 if initial_mode == self.hsm2dongle.MODE.SIGNER:
543 # This should raise a communication error due to USB
544 # disconnection. Treat as successful
545 try:
546 self.hsm2dongle.exit_app()
547 except HSM2DongleCommError:
548 pass
549 self._wait_and_reconnect()
550 # Check we are now back in the Signer
551 new_mode = self.hsm2dongle.get_current_mode()
552 if new_mode != self.hsm2dongle.MODE.SIGNER:
553 self.logger.error("Expected dongle to be in Signer"
554 f" mode but got {new_mode}")
555 return (self.ERROR_CODE_DEVICE,)
557 # Treat any user-errors as a device (unexpected) error
558 if not heartbeat[0]:
559 return (self.ERROR_CODE_DEVICE,)
560 heartbeat = heartbeat[1]
562 return (self.ERROR_CODE_OK, {
563 "pubKey": heartbeat["pubKey"],
564 "message": heartbeat["message"],
565 "tweak": heartbeat["tweak"],
566 "signature": {
567 "r": heartbeat["signature"].r,
568 "s": heartbeat["signature"].s
569 }
570 })
571 except (HSM2DongleError, HSM2DongleTimeoutError) as e:
572 self.logger.error("Dongle error in UI heartbeat: %s", str(e))
573 return (self.ERROR_CODE_DEVICE,)
574 except HSM2DongleCommError:
575 # Signal a communication problem and return a device error
576 self._comm_issue = True
577 self.logger.error("Dongle communication error in UI heartbeat")
578 return (self.ERROR_CODE_DEVICE,)