Coverage for tests/ledger/test_protocol.py: 100%

445 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest import TestCase 

24from unittest.mock import Mock, call, patch 

25from parameterized import parameterized 

26from comm.protocol import HSM2ProtocolError 

27from ledger.protocol import HSM2ProtocolLedger 

28from ledger.hsm2dongle import ( 

29 HSM2Dongle, 

30 HSM2DongleError, 

31 HSM2DongleErrorResult, 

32 HSM2DongleTimeoutError, 

33 HSM2DongleCommError, 

34 HSM2FirmwareParameters, 

35 SighashComputationMode, 

36) 

37from ledger.version import HSM2FirmwareVersion 

38 

39import logging 

40 

41logging.disable(logging.CRITICAL) 

42 

43 

44class TestHSM2ProtocolLedger(TestCase): 

45 def setUp(self): 

46 self.pin = Mock() 

47 self.dongle = Mock() 

48 self.dongle.connect = Mock() 

49 self.dongle.disconnect = Mock() 

50 self.dongle.is_onboarded = Mock(return_value=True) 

51 self.dongle.get_current_mode = Mock(return_value=HSM2Dongle.MODE.SIGNER) 

52 self.dongle.get_version = Mock(return_value=HSM2FirmwareVersion(5, 5, 1)) 

53 self.dongle.get_signer_parameters = Mock(return_value=Mock( 

54 min_required_difficulty=123)) 

55 self.protocol = HSM2ProtocolLedger(self.pin, self.dongle) 

56 self.protocol.initialize_device() 

57 

58 @patch("comm.protocol.BIP32Path") 

59 def test_get_pubkey_ok(self, BIP32PathMock): 

60 BIP32PathMock.return_value = "the-key-id" 

61 self.dongle.get_public_key.return_value = "this-is-the-public-key" 

62 

63 self.assertEqual( 

64 { 

65 "errorcode": 0, 

66 "pubKey": "this-is-the-public-key" 

67 }, 

68 self.protocol.handle_request({ 

69 "version": 5, 

70 "command": "getPubKey", 

71 "keyId": "m/44'/1'/2'/3/4" 

72 }), 

73 ) 

74 self.assertEqual([call("the-key-id")], self.dongle.get_public_key.call_args_list) 

75 self.assertFalse(self.dongle.disconnect.called) 

76 

77 @patch("comm.protocol.BIP32Path") 

78 def test_get_pubkey_error(self, BIP32PathMock): 

79 BIP32PathMock.return_value = "the-key-id" 

80 self.dongle.get_public_key.side_effect = HSM2DongleErrorResult() 

81 

82 self.assertEqual( 

83 {"errorcode": -103}, 

84 self.protocol.handle_request({ 

85 "version": 5, 

86 "command": "getPubKey", 

87 "keyId": "m/44'/1'/2'/3/4" 

88 }), 

89 ) 

90 self.assertEqual([call("the-key-id")], self.dongle.get_public_key.call_args_list) 

91 self.assertFalse(self.dongle.disconnect.called) 

92 

93 @patch("comm.protocol.BIP32Path") 

94 def test_get_pubkey_timeout(self, BIP32PathMock): 

95 BIP32PathMock.return_value = "the-key-id" 

96 self.dongle.get_public_key.side_effect = HSM2DongleTimeoutError() 

97 

98 self.assertEqual( 

99 {"errorcode": -905}, 

100 self.protocol.handle_request({ 

101 "version": 5, 

102 "command": "getPubKey", 

103 "keyId": "m/44'/1'/2'/3/4" 

104 }), 

105 ) 

106 self.assertEqual([call("the-key-id")], self.dongle.get_public_key.call_args_list) 

107 self.assertFalse(self.dongle.disconnect.called) 

108 

109 @patch("comm.protocol.BIP32Path") 

110 def test_get_pubkey_commerror_reconnection(self, BIP32PathMock): 

111 BIP32PathMock.return_value = "the-key-id" 

112 self.dongle.get_public_key.side_effect = HSM2DongleCommError() 

113 

114 self.assertEqual( 

115 {"errorcode": -905}, 

116 self.protocol.handle_request({ 

117 "version": 5, 

118 "command": "getPubKey", 

119 "keyId": "m/44'/1'/2'/3/4" 

120 }), 

121 ) 

122 self.assertEqual([call("the-key-id")], self.dongle.get_public_key.call_args_list) 

123 self.assertFalse(self.dongle.disconnect.called) 

124 

125 # Reconnection logic testing 

126 self.dongle.get_public_key.side_effect = None 

127 self.dongle.get_public_key.return_value = "this-is-the-public-key" 

128 

129 self.assertEqual( 

130 { 

131 "errorcode": 0, 

132 "pubKey": "this-is-the-public-key" 

133 }, 

134 self.protocol.handle_request({ 

135 "version": 5, 

136 "command": "getPubKey", 

137 "keyId": "m/44'/1'/2'/3/4" 

138 }), 

139 ) 

140 

141 self._assert_reconnected() 

142 

143 @patch("comm.protocol.BIP32Path") 

144 def test_get_pubkey_unexpected_error(self, BIP32PathMock): 

145 BIP32PathMock.return_value = "the-key-id" 

146 self.dongle.get_public_key.side_effect = HSM2DongleError() 

147 

148 with self.assertRaises(HSM2ProtocolError): 

149 self.protocol.handle_request({ 

150 "version": 5, 

151 "command": "getPubKey", 

152 "keyId": "m/44'/1'/2'/3/4" 

153 }) 

154 

155 self.assertEqual([call("the-key-id")], self.dongle.get_public_key.call_args_list) 

156 self.assertFalse(self.dongle.disconnect.called) 

157 

158 @patch("ledger.protocol.get_tx_hash") 

159 @patch("ledger.protocol.get_unsigned_tx") 

160 @patch("comm.protocol.BIP32Path") 

161 def test_sign_authorized_legacy_ok(self, BIP32PathMock, get_unsigned_tx_mock, _): 

162 BIP32PathMock.return_value = "the-key-id" 

163 signature = Mock(r="this-is-r", s="this-is-s") 

164 self.dongle.sign_authorized.return_value = (True, signature) 

165 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

166 

167 self.assertEqual( 

168 { 

169 "errorcode": 0, 

170 "signature": { 

171 "r": "this-is-r", 

172 "s": "this-is-s" 

173 } 

174 }, 

175 self.protocol.handle_request({ 

176 "version": 5, 

177 "command": "sign", 

178 "keyId": "m/44'/1'/2'/3/4", 

179 "auth": { 

180 "receipt": "aa", 

181 "receipt_merkle_proof": ["cc", "dd"] 

182 }, 

183 "message": { 

184 "sighashComputationMode": "legacy", 

185 "tx": "eeff", 

186 "input": 12 

187 }, 

188 }), 

189 ) 

190 

191 self.assertEqual( 

192 [ 

193 call( 

194 key_id="the-key-id", 

195 rsk_tx_receipt="aa", 

196 receipt_merkle_proof=["cc", "dd"], 

197 btc_tx="the-unsigned-tx", 

198 input_index=12, 

199 sighash_computation_mode=SighashComputationMode.LEGACY, 

200 witness_script=None, 

201 outpoint_value=None, 

202 ) 

203 ], 

204 self.dongle.sign_authorized.call_args_list, 

205 ) 

206 self.assertFalse(self.dongle.disconnect.called) 

207 

208 @patch("ledger.protocol.get_tx_hash") 

209 @patch("ledger.protocol.get_unsigned_tx") 

210 @patch("comm.protocol.BIP32Path") 

211 def test_sign_authorized_segwit_ok(self, BIP32PathMock, get_unsigned_tx_mock, _): 

212 BIP32PathMock.return_value = "the-key-id" 

213 signature = Mock(r="this-is-r", s="this-is-s") 

214 self.dongle.sign_authorized.return_value = (True, signature) 

215 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

216 

217 self.assertEqual( 

218 { 

219 "errorcode": 0, 

220 "signature": { 

221 "r": "this-is-r", 

222 "s": "this-is-s" 

223 } 

224 }, 

225 self.protocol.handle_request({ 

226 "version": 5, 

227 "command": "sign", 

228 "keyId": "m/44'/1'/2'/3/4", 

229 "auth": { 

230 "receipt": "aa", 

231 "receipt_merkle_proof": ["cc", "dd"] 

232 }, 

233 "message": { 

234 "sighashComputationMode": "segwit", 

235 "tx": "eeff", 

236 "input": 12, 

237 "witnessScript": "aabbccddeeff", 

238 "outpointValue": 123000456 

239 }, 

240 }), 

241 ) 

242 

243 self.assertEqual( 

244 [ 

245 call( 

246 key_id="the-key-id", 

247 rsk_tx_receipt="aa", 

248 receipt_merkle_proof=["cc", "dd"], 

249 btc_tx="the-unsigned-tx", 

250 input_index=12, 

251 sighash_computation_mode=SighashComputationMode.SEGWIT, 

252 witness_script="aabbccddeeff", 

253 outpoint_value=123000456, 

254 ) 

255 ], 

256 self.dongle.sign_authorized.call_args_list, 

257 ) 

258 self.assertFalse(self.dongle.disconnect.called) 

259 

260 @parameterized.expand([ 

261 ("path", -1, -103), 

262 ("btc_tx", -2, -102), 

263 ("receipt", -3, -101), 

264 ("merkle_proof", -4, -101), 

265 ("unexpected", -10, -905), 

266 ("unknown", -100, -906), 

267 ]) 

268 @patch("ledger.protocol.get_tx_hash") 

269 @patch("ledger.protocol.get_unsigned_tx") 

270 @patch("comm.protocol.BIP32Path") 

271 def test_sign_authorized_legacy_error( 

272 self, 

273 _, 

274 dongle_error_code, 

275 protocol_error_code, 

276 BIP32PathMock, 

277 get_unsigned_tx_mock, 

278 __, 

279 ): 

280 BIP32PathMock.return_value = "the-key-id" 

281 self.dongle.sign_authorized.return_value = (False, dongle_error_code) 

282 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

283 

284 self.assertEqual( 

285 {"errorcode": protocol_error_code}, 

286 self.protocol.handle_request({ 

287 "version": 5, 

288 "command": "sign", 

289 "keyId": "m/44'/1'/2'/3/4", 

290 "auth": { 

291 "receipt": "aa", 

292 "receipt_merkle_proof": ["cc", "dd"] 

293 }, 

294 "message": { 

295 "sighashComputationMode": "legacy", 

296 "tx": "eeff", 

297 "input": 12 

298 }, 

299 }), 

300 ) 

301 

302 self.assertEqual( 

303 [ 

304 call( 

305 key_id="the-key-id", 

306 rsk_tx_receipt="aa", 

307 receipt_merkle_proof=["cc", "dd"], 

308 btc_tx="the-unsigned-tx", 

309 input_index=12, 

310 sighash_computation_mode=SighashComputationMode.LEGACY, 

311 witness_script=None, 

312 outpoint_value=None, 

313 ) 

314 ], 

315 self.dongle.sign_authorized.call_args_list, 

316 ) 

317 self.assertFalse(self.dongle.disconnect.called) 

318 

319 @parameterized.expand([ 

320 ("path", -1, -103), 

321 ("btc_tx", -2, -102), 

322 ("receipt", -3, -101), 

323 ("merkle_proof", -4, -101), 

324 ("unexpected", -10, -905), 

325 ("unknown", -100, -906), 

326 ]) 

327 @patch("ledger.protocol.get_tx_hash") 

328 @patch("ledger.protocol.get_unsigned_tx") 

329 @patch("comm.protocol.BIP32Path") 

330 def test_sign_authorized_segwit_error( 

331 self, 

332 _, 

333 dongle_error_code, 

334 protocol_error_code, 

335 BIP32PathMock, 

336 get_unsigned_tx_mock, 

337 __, 

338 ): 

339 BIP32PathMock.return_value = "the-key-id" 

340 self.dongle.sign_authorized.return_value = (False, dongle_error_code) 

341 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

342 

343 self.assertEqual( 

344 {"errorcode": protocol_error_code}, 

345 self.protocol.handle_request({ 

346 "version": 5, 

347 "command": "sign", 

348 "keyId": "m/44'/1'/2'/3/4", 

349 "auth": { 

350 "receipt": "aa", 

351 "receipt_merkle_proof": ["cc", "dd"] 

352 }, 

353 "message": { 

354 "sighashComputationMode": "segwit", 

355 "tx": "eeff", 

356 "input": 12, 

357 "witnessScript": "aabbccddeeff", 

358 "outpointValue": 123000456 

359 }, 

360 }), 

361 ) 

362 

363 self.assertEqual( 

364 [ 

365 call( 

366 key_id="the-key-id", 

367 rsk_tx_receipt="aa", 

368 receipt_merkle_proof=["cc", "dd"], 

369 btc_tx="the-unsigned-tx", 

370 input_index=12, 

371 sighash_computation_mode=SighashComputationMode.SEGWIT, 

372 witness_script="aabbccddeeff", 

373 outpoint_value=123000456, 

374 ) 

375 ], 

376 self.dongle.sign_authorized.call_args_list, 

377 ) 

378 self.assertFalse(self.dongle.disconnect.called) 

379 

380 @patch("ledger.protocol.get_tx_hash") 

381 @patch("ledger.protocol.get_unsigned_tx") 

382 @patch("comm.protocol.BIP32Path") 

383 def test_sign_authorized_legacy_timeout(self, BIP32PathMock, get_unsigned_tx_mock, _): 

384 BIP32PathMock.return_value = "the-key-id" 

385 self.dongle.sign_authorized.side_effect = HSM2DongleTimeoutError() 

386 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

387 

388 self.assertEqual( 

389 {"errorcode": -905}, 

390 self.protocol.handle_request({ 

391 "version": 5, 

392 "command": "sign", 

393 "keyId": "m/44'/1'/2'/3/4", 

394 "auth": { 

395 "receipt": "aa", 

396 "receipt_merkle_proof": ["cc", "dd"] 

397 }, 

398 "message": { 

399 "sighashComputationMode": "legacy", 

400 "tx": "eeff", 

401 "input": 12 

402 }, 

403 }), 

404 ) 

405 

406 self.assertEqual( 

407 [ 

408 call( 

409 key_id="the-key-id", 

410 rsk_tx_receipt="aa", 

411 receipt_merkle_proof=["cc", "dd"], 

412 btc_tx="the-unsigned-tx", 

413 input_index=12, 

414 sighash_computation_mode=SighashComputationMode.LEGACY, 

415 witness_script=None, 

416 outpoint_value=None, 

417 ) 

418 ], 

419 self.dongle.sign_authorized.call_args_list, 

420 ) 

421 self.assertFalse(self.dongle.disconnect.called) 

422 

423 @patch("ledger.protocol.get_tx_hash") 

424 @patch("ledger.protocol.get_unsigned_tx") 

425 @patch("comm.protocol.BIP32Path") 

426 def test_sign_authorized_segwit_timeout(self, BIP32PathMock, get_unsigned_tx_mock, _): 

427 BIP32PathMock.return_value = "the-key-id" 

428 self.dongle.sign_authorized.side_effect = HSM2DongleTimeoutError() 

429 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

430 

431 self.assertEqual( 

432 {"errorcode": -905}, 

433 self.protocol.handle_request({ 

434 "version": 5, 

435 "command": "sign", 

436 "keyId": "m/44'/1'/2'/3/4", 

437 "auth": { 

438 "receipt": "aa", 

439 "receipt_merkle_proof": ["cc", "dd"] 

440 }, 

441 "message": { 

442 "sighashComputationMode": "segwit", 

443 "tx": "eeff", 

444 "input": 12, 

445 "witnessScript": "aabbccddeeff", 

446 "outpointValue": 123000456 

447 }, 

448 }), 

449 ) 

450 

451 self.assertEqual( 

452 [ 

453 call( 

454 key_id="the-key-id", 

455 rsk_tx_receipt="aa", 

456 receipt_merkle_proof=["cc", "dd"], 

457 btc_tx="the-unsigned-tx", 

458 input_index=12, 

459 sighash_computation_mode=SighashComputationMode.SEGWIT, 

460 witness_script="aabbccddeeff", 

461 outpoint_value=123000456, 

462 ) 

463 ], 

464 self.dongle.sign_authorized.call_args_list, 

465 ) 

466 self.assertFalse(self.dongle.disconnect.called) 

467 

468 @patch("ledger.protocol.get_tx_hash") 

469 @patch("ledger.protocol.get_unsigned_tx") 

470 @patch("comm.protocol.BIP32Path") 

471 def test_sign_authorized_legacy_commerror_reconnection(self, BIP32PathMock, 

472 get_unsigned_tx_mock, _): 

473 BIP32PathMock.return_value = "the-key-id" 

474 self.dongle.sign_authorized.side_effect = HSM2DongleCommError() 

475 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

476 

477 self.assertEqual( 

478 {"errorcode": -905}, 

479 self.protocol.handle_request({ 

480 "version": 5, 

481 "command": "sign", 

482 "keyId": "m/44'/1'/2'/3/4", 

483 "auth": { 

484 "receipt": "aa", 

485 "receipt_merkle_proof": ["cc", "dd"] 

486 }, 

487 "message": { 

488 "sighashComputationMode": "legacy", 

489 "tx": "eeff", 

490 "input": 12 

491 }, 

492 }), 

493 ) 

494 

495 self.assertEqual( 

496 [ 

497 call( 

498 key_id="the-key-id", 

499 rsk_tx_receipt="aa", 

500 receipt_merkle_proof=["cc", "dd"], 

501 btc_tx="the-unsigned-tx", 

502 input_index=12, 

503 sighash_computation_mode=SighashComputationMode.LEGACY, 

504 witness_script=None, 

505 outpoint_value=None, 

506 ) 

507 ], 

508 self.dongle.sign_authorized.call_args_list, 

509 ) 

510 self.assertFalse(self.dongle.disconnect.called) 

511 

512 # Reconnection logic 

513 self.dongle.sign_authorized.side_effect = None 

514 signature = Mock(r="this-is-r", s="this-is-s") 

515 self.dongle.sign_authorized.return_value = (True, signature) 

516 

517 self.assertEqual( 

518 { 

519 "errorcode": 0, 

520 "signature": { 

521 "r": "this-is-r", 

522 "s": "this-is-s" 

523 } 

524 }, 

525 self.protocol.handle_request({ 

526 "version": 5, 

527 "command": "sign", 

528 "keyId": "m/44'/1'/2'/3/4", 

529 "auth": { 

530 "receipt": "aa", 

531 "receipt_merkle_proof": ["cc", "dd"] 

532 }, 

533 "message": { 

534 "sighashComputationMode": "legacy", 

535 "tx": "eeff", 

536 "input": 12 

537 }, 

538 }), 

539 ) 

540 

541 self._assert_reconnected() 

542 

543 @patch("ledger.protocol.get_tx_hash") 

544 @patch("ledger.protocol.get_unsigned_tx") 

545 @patch("comm.protocol.BIP32Path") 

546 def test_sign_authorized_segwit_commerror_reconnection(self, BIP32PathMock, 

547 get_unsigned_tx_mock, _): 

548 BIP32PathMock.return_value = "the-key-id" 

549 self.dongle.sign_authorized.side_effect = HSM2DongleCommError() 

550 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

551 

552 self.assertEqual( 

553 {"errorcode": -905}, 

554 self.protocol.handle_request({ 

555 "version": 5, 

556 "command": "sign", 

557 "keyId": "m/44'/1'/2'/3/4", 

558 "auth": { 

559 "receipt": "aa", 

560 "receipt_merkle_proof": ["cc", "dd"] 

561 }, 

562 "message": { 

563 "sighashComputationMode": "segwit", 

564 "tx": "eeff", 

565 "input": 12, 

566 "witnessScript": "aabbccddeeff", 

567 "outpointValue": 123000456 

568 }, 

569 }), 

570 ) 

571 

572 self.assertEqual( 

573 [ 

574 call( 

575 key_id="the-key-id", 

576 rsk_tx_receipt="aa", 

577 receipt_merkle_proof=["cc", "dd"], 

578 btc_tx="the-unsigned-tx", 

579 input_index=12, 

580 sighash_computation_mode=SighashComputationMode.SEGWIT, 

581 witness_script="aabbccddeeff", 

582 outpoint_value=123000456, 

583 ) 

584 ], 

585 self.dongle.sign_authorized.call_args_list, 

586 ) 

587 self.assertFalse(self.dongle.disconnect.called) 

588 

589 # Reconnection logic 

590 self.dongle.sign_authorized.side_effect = None 

591 signature = Mock(r="this-is-r", s="this-is-s") 

592 self.dongle.sign_authorized.return_value = (True, signature) 

593 

594 self.assertEqual( 

595 { 

596 "errorcode": 0, 

597 "signature": { 

598 "r": "this-is-r", 

599 "s": "this-is-s" 

600 } 

601 }, 

602 self.protocol.handle_request({ 

603 "version": 5, 

604 "command": "sign", 

605 "keyId": "m/44'/1'/2'/3/4", 

606 "auth": { 

607 "receipt": "aa", 

608 "receipt_merkle_proof": ["cc", "dd"] 

609 }, 

610 "message": { 

611 "sighashComputationMode": "segwit", 

612 "tx": "eeff", 

613 "input": 12, 

614 "witnessScript": "001122", 

615 "outpointValue": 567000111 

616 }, 

617 }), 

618 ) 

619 

620 self._assert_reconnected() 

621 

622 @patch("ledger.protocol.get_tx_hash") 

623 @patch("ledger.protocol.get_unsigned_tx") 

624 @patch("comm.protocol.BIP32Path") 

625 def test_sign_authorized_legacy_exception(self, BIP32PathMock, 

626 get_unsigned_tx_mock, _): 

627 BIP32PathMock.return_value = "the-key-id" 

628 self.dongle.sign_authorized.side_effect = HSM2DongleError() 

629 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

630 

631 with self.assertRaises(HSM2ProtocolError): 

632 self.protocol.handle_request({ 

633 "version": 5, 

634 "command": "sign", 

635 "keyId": "m/44'/1'/2'/3/4", 

636 "auth": { 

637 "receipt": "aa", 

638 "receipt_merkle_proof": ["cc", "dd"] 

639 }, 

640 "message": { 

641 "sighashComputationMode": "legacy", 

642 "tx": "eeff", 

643 "input": 12 

644 }, 

645 }) 

646 

647 self.assertEqual( 

648 [ 

649 call( 

650 key_id="the-key-id", 

651 rsk_tx_receipt="aa", 

652 receipt_merkle_proof=["cc", "dd"], 

653 btc_tx="the-unsigned-tx", 

654 input_index=12, 

655 sighash_computation_mode=SighashComputationMode.LEGACY, 

656 witness_script=None, 

657 outpoint_value=None, 

658 ) 

659 ], 

660 self.dongle.sign_authorized.call_args_list, 

661 ) 

662 self.assertFalse(self.dongle.disconnect.called) 

663 

664 @patch("ledger.protocol.get_tx_hash") 

665 @patch("ledger.protocol.get_unsigned_tx") 

666 @patch("comm.protocol.BIP32Path") 

667 def test_sign_authorized_segwit_exception(self, BIP32PathMock, 

668 get_unsigned_tx_mock, _): 

669 BIP32PathMock.return_value = "the-key-id" 

670 self.dongle.sign_authorized.side_effect = HSM2DongleError() 

671 get_unsigned_tx_mock.return_value = "the-unsigned-tx" 

672 

673 with self.assertRaises(HSM2ProtocolError): 

674 self.protocol.handle_request({ 

675 "version": 5, 

676 "command": "sign", 

677 "keyId": "m/44'/1'/2'/3/4", 

678 "auth": { 

679 "receipt": "aa", 

680 "receipt_merkle_proof": ["cc", "dd"] 

681 }, 

682 "message": { 

683 "sighashComputationMode": "segwit", 

684 "tx": "eeff", 

685 "input": 12, 

686 "witnessScript": "aabbccddeeff", 

687 "outpointValue": 123000456 

688 }, 

689 }) 

690 

691 self.assertEqual( 

692 [ 

693 call( 

694 key_id="the-key-id", 

695 rsk_tx_receipt="aa", 

696 receipt_merkle_proof=["cc", "dd"], 

697 btc_tx="the-unsigned-tx", 

698 input_index=12, 

699 sighash_computation_mode=SighashComputationMode.SEGWIT, 

700 witness_script="aabbccddeeff", 

701 outpoint_value=123000456, 

702 ) 

703 ], 

704 self.dongle.sign_authorized.call_args_list, 

705 ) 

706 self.assertFalse(self.dongle.disconnect.called) 

707 

708 @patch("ledger.protocol.get_tx_hash") 

709 @patch("ledger.protocol.get_unsigned_tx") 

710 @patch("comm.protocol.BIP32Path") 

711 def test_sign_authorized_legacy_error_unsigning(self, BIP32PathMock, 

712 get_unsigned_tx_mock, _): 

713 BIP32PathMock.return_value = "the-key-id" 

714 get_unsigned_tx_mock.side_effect = RuntimeError() 

715 

716 self.assertEqual( 

717 {"errorcode": -102}, 

718 self.protocol.handle_request({ 

719 "version": 5, 

720 "command": "sign", 

721 "keyId": "m/44'/1'/2'/3/4", 

722 "auth": { 

723 "receipt": "aa", 

724 "receipt_merkle_proof": ["cc", "dd"] 

725 }, 

726 "message": { 

727 "sighashComputationMode": "legacy", 

728 "tx": "eeff", 

729 "input": 12 

730 }, 

731 }), 

732 ) 

733 

734 self.assertFalse(self.dongle.sign_authorized.called) 

735 self.assertFalse(self.dongle.disconnect.called) 

736 

737 @patch("ledger.protocol.get_tx_hash") 

738 @patch("ledger.protocol.get_unsigned_tx") 

739 @patch("comm.protocol.BIP32Path") 

740 def test_sign_authorized_segwit_error_unsigning(self, BIP32PathMock, 

741 get_unsigned_tx_mock, _): 

742 BIP32PathMock.return_value = "the-key-id" 

743 get_unsigned_tx_mock.side_effect = RuntimeError() 

744 

745 self.assertEqual( 

746 {"errorcode": -102}, 

747 self.protocol.handle_request({ 

748 "version": 5, 

749 "command": "sign", 

750 "keyId": "m/44'/1'/2'/3/4", 

751 "auth": { 

752 "receipt": "aa", 

753 "receipt_merkle_proof": ["cc", "dd"] 

754 }, 

755 "message": { 

756 "sighashComputationMode": "segwit", 

757 "tx": "eeff", 

758 "input": 12, 

759 "witnessScript": "aabbccddeeff", 

760 "outpointValue": 123000456 

761 }, 

762 }), 

763 ) 

764 

765 self.assertFalse(self.dongle.sign_authorized.called) 

766 self.assertFalse(self.dongle.disconnect.called) 

767 

768 @patch("ledger.protocol.get_tx_hash") 

769 @patch("ledger.protocol.get_unsigned_tx") 

770 @patch("comm.protocol.BIP32Path") 

771 def test_sign_authorized_message_invalid(self, BIP32PathMock, get_unsigned_tx_mock, 

772 _): 

773 BIP32PathMock.return_value = "the-key-id" 

774 

775 self.assertEqual( 

776 {"errorcode": -102}, 

777 self.protocol.handle_request({ 

778 "version": 5, 

779 "command": "sign", 

780 "keyId": "m/44'/1'/2'/3/4", 

781 "auth": { 

782 "receipt": "aa", 

783 "receipt_merkle_proof": ["cc", "dd"] 

784 }, 

785 "message": { 

786 "sighashComputationMode": "legacy", 

787 "tx": "invalid", 

788 "input": 12 

789 }, 

790 }), 

791 ) 

792 

793 self.assertFalse(self.dongle.sign_authorized.called) 

794 self.assertFalse(get_unsigned_tx_mock.called) 

795 self.assertFalse(self.dongle.disconnect.called) 

796 

797 @patch("ledger.protocol.get_tx_hash") 

798 @patch("ledger.protocol.get_unsigned_tx") 

799 @patch("comm.protocol.BIP32Path") 

800 def test_sign_authorized_auth_invalid(self, BIP32PathMock, get_unsigned_tx_mock, _): 

801 BIP32PathMock.return_value = "the-key-id" 

802 

803 self.assertEqual( 

804 {"errorcode": -101}, 

805 self.protocol.handle_request({ 

806 "version": 5, 

807 "command": "sign", 

808 "keyId": "m/44'/1'/2'/3/4", 

809 "auth": { 

810 "receipt": "not-a-hex", 

811 "receipt_merkle_proof": ["cc", "dd"], 

812 }, 

813 "message": { 

814 "sighashComputationMode": "legacy", 

815 "tx": "eeff", 

816 "input": 12 

817 }, 

818 }), 

819 ) 

820 

821 self.assertFalse(self.dongle.sign_authorized.called) 

822 self.assertFalse(get_unsigned_tx_mock.called) 

823 self.assertFalse(self.dongle.disconnect.called) 

824 

825 @patch("comm.protocol.BIP32Path") 

826 def test_sign_unauthorized_ok(self, BIP32PathMock): 

827 BIP32PathMock.return_value = "the-key-id" 

828 signature = Mock(r="this-is-r", s="this-is-s") 

829 self.dongle.sign_unauthorized.return_value = (True, signature) 

830 

831 self.assertEqual( 

832 { 

833 "errorcode": 0, 

834 "signature": { 

835 "r": "this-is-r", 

836 "s": "this-is-s" 

837 } 

838 }, 

839 self.protocol.handle_request({ 

840 "version": 5, 

841 "command": "sign", 

842 "keyId": "m/44'/1'/2'/3/4", 

843 "message": { 

844 "hash": "aa"*32 

845 }, 

846 }), 

847 ) 

848 

849 self.assertEqual( 

850 [call(key_id="the-key-id", hash="aa"*32)], 

851 self.dongle.sign_unauthorized.call_args_list, 

852 ) 

853 self.assertFalse(self.dongle.disconnect.called) 

854 

855 @parameterized.expand([ 

856 ("path", -1, -103), 

857 ("hash", -5, -102), 

858 ("unexpected", -10, -905), 

859 ("unknown", -100, -906), 

860 ]) 

861 @patch("comm.protocol.BIP32Path") 

862 def test_sign_unauthorized_error(self, _, dongle_error_code, protocol_error_code, 

863 BIP32PathMock): 

864 BIP32PathMock.return_value = "the-key-id" 

865 self.dongle.sign_unauthorized.return_value = (False, dongle_error_code) 

866 

867 self.assertEqual( 

868 {"errorcode": protocol_error_code}, 

869 self.protocol.handle_request({ 

870 "version": 5, 

871 "command": "sign", 

872 "keyId": "m/44'/1'/2'/3/4", 

873 "message": { 

874 "hash": "aa"*32 

875 }, 

876 }), 

877 ) 

878 

879 self.assertEqual( 

880 [call(key_id="the-key-id", hash="aa"*32)], 

881 self.dongle.sign_unauthorized.call_args_list, 

882 ) 

883 self.assertFalse(self.dongle.disconnect.called) 

884 

885 @patch("comm.protocol.BIP32Path") 

886 def test_sign_unauthorized_timeout(self, BIP32PathMock): 

887 BIP32PathMock.return_value = "the-key-id" 

888 self.dongle.sign_unauthorized.side_effect = HSM2DongleTimeoutError() 

889 

890 self.assertEqual( 

891 {"errorcode": -905}, 

892 self.protocol.handle_request({ 

893 "version": 5, 

894 "command": "sign", 

895 "keyId": "m/44'/1'/2'/3/4", 

896 "message": { 

897 "hash": "aa"*32 

898 }, 

899 }), 

900 ) 

901 

902 self.assertEqual( 

903 [call(key_id="the-key-id", hash="aa"*32)], 

904 self.dongle.sign_unauthorized.call_args_list, 

905 ) 

906 self.assertFalse(self.dongle.disconnect.called) 

907 

908 @patch("comm.protocol.BIP32Path") 

909 def test_sign_unauthorized_commerror_reconnection(self, BIP32PathMock): 

910 BIP32PathMock.return_value = "the-key-id" 

911 self.dongle.sign_unauthorized.side_effect = HSM2DongleCommError() 

912 

913 self.assertEqual( 

914 {"errorcode": -905}, 

915 self.protocol.handle_request({ 

916 "version": 5, 

917 "command": "sign", 

918 "keyId": "m/44'/1'/2'/3/4", 

919 "message": { 

920 "hash": "aa"*32 

921 }, 

922 }), 

923 ) 

924 

925 self.assertEqual( 

926 [call(key_id="the-key-id", hash="aa"*32)], 

927 self.dongle.sign_unauthorized.call_args_list, 

928 ) 

929 self.assertFalse(self.dongle.disconnect.called) 

930 

931 # Reconnection logic 

932 self.dongle.sign_unauthorized.side_effect = None 

933 signature = Mock(r="this-is-r", s="this-is-s") 

934 self.dongle.sign_unauthorized.return_value = (True, signature) 

935 

936 self.assertEqual( 

937 { 

938 "errorcode": 0, 

939 "signature": { 

940 "r": "this-is-r", 

941 "s": "this-is-s" 

942 } 

943 }, 

944 self.protocol.handle_request({ 

945 "version": 5, 

946 "command": "sign", 

947 "keyId": "m/44'/1'/2'/3/4", 

948 "message": { 

949 "hash": "aa"*32 

950 }, 

951 }), 

952 ) 

953 

954 self._assert_reconnected() 

955 

956 @patch("comm.protocol.BIP32Path") 

957 def test_sign_unauthorized_exception(self, BIP32PathMock): 

958 BIP32PathMock.return_value = "the-key-id" 

959 self.dongle.sign_unauthorized.side_effect = HSM2DongleError() 

960 

961 with self.assertRaises(HSM2ProtocolError): 

962 self.protocol.handle_request({ 

963 "version": 5, 

964 "command": "sign", 

965 "keyId": "m/44'/1'/2'/3/4", 

966 "message": { 

967 "hash": "aa"*32 

968 }, 

969 }) 

970 

971 self.assertEqual( 

972 [call(key_id="the-key-id", hash="aa"*32)], 

973 self.dongle.sign_unauthorized.call_args_list, 

974 ) 

975 self.assertFalse(self.dongle.disconnect.called) 

976 

977 @patch("comm.protocol.BIP32Path") 

978 def test_sign_unauthorized_message_invalid(self, BIP32PathMock): 

979 BIP32PathMock.return_value = "the-key-id" 

980 

981 self.assertEqual( 

982 {"errorcode": -102}, 

983 self.protocol.handle_request({ 

984 "version": 5, 

985 "command": "sign", 

986 "keyId": "m/44'/1'/2'/3/4", 

987 "message": { 

988 "hash": "not-a-hexadecimal-string" 

989 }, 

990 }), 

991 ) 

992 

993 self.assertFalse(self.dongle.sign_unauthorized.called) 

994 self.assertFalse(self.dongle.disconnect.called) 

995 

996 def test_blockchain_state_ok(self): 

997 self.dongle.get_blockchain_state.return_value = { 

998 "best_block": "the-best-block", 

999 "newest_valid_block": "the-newest_valid_block", 

1000 "ancestor_block": "the-ancestor-block", 

1001 "ancestor_receipts_root": "the-ancestor-receipts-root", 

1002 "updating.best_block": "the-updating-best-block", 

1003 "updating.newest_valid_block": "the-updating-newest-valid-block", 

1004 "updating.next_expected_block": "the-updating-next-expected-block", 

1005 "updating.total_difficulty": "total-difficulty", 

1006 "updating.in_progress": "is-in-progress", 

1007 "updating.already_validated": "is-already-validated", 

1008 "updating.found_best_block": "have-found-best-block", 

1009 } 

1010 

1011 self.assertEqual( 

1012 { 

1013 "errorcode": 0, 

1014 "state": { 

1015 "best_block": "the-best-block", 

1016 "newest_valid_block": "the-newest_valid_block", 

1017 "ancestor_block": "the-ancestor-block", 

1018 "ancestor_receipts_root": "the-ancestor-receipts-root", 

1019 "updating": { 

1020 "best_block": "the-updating-best-block", 

1021 "newest_valid_block": "the-updating-newest-valid-block", 

1022 "next_expected_block": "the-updating-next-expected-block", 

1023 "total_difficulty": "total-difficulty", 

1024 "in_progress": "is-in-progress", 

1025 "already_validated": "is-already-validated", 

1026 "found_best_block": "have-found-best-block", 

1027 }, 

1028 }, 

1029 }, 

1030 self.protocol.handle_request({ 

1031 "version": 5, 

1032 "command": "blockchainState" 

1033 }), 

1034 ) 

1035 

1036 self.assertEqual([call()], self.dongle.get_blockchain_state.call_args_list) 

1037 self.assertFalse(self.dongle.disconnect.called) 

1038 

1039 def test_blockchain_state_dongle_exception(self): 

1040 self.dongle.get_blockchain_state.side_effect = HSM2DongleError("an-error") 

1041 

1042 self.assertEqual( 

1043 {"errorcode": -905}, 

1044 self.protocol.handle_request({ 

1045 "version": 5, 

1046 "command": "blockchainState" 

1047 }), 

1048 ) 

1049 

1050 self.assertEqual([call()], self.dongle.get_blockchain_state.call_args_list) 

1051 self.assertFalse(self.dongle.disconnect.called) 

1052 

1053 def test_blockchain_state_dongle_timeout(self): 

1054 self.dongle.get_blockchain_state.side_effect = HSM2DongleTimeoutError() 

1055 

1056 self.assertEqual( 

1057 {"errorcode": -905}, 

1058 self.protocol.handle_request({ 

1059 "version": 5, 

1060 "command": "blockchainState" 

1061 }), 

1062 ) 

1063 

1064 self.assertEqual([call()], self.dongle.get_blockchain_state.call_args_list) 

1065 self.assertFalse(self.dongle.disconnect.called) 

1066 

1067 def test_blockchain_state_dongle_commerror_reconnection(self): 

1068 self.dongle.get_blockchain_state.side_effect = HSM2DongleCommError() 

1069 

1070 self.assertEqual( 

1071 {"errorcode": -905}, 

1072 self.protocol.handle_request({ 

1073 "version": 5, 

1074 "command": "blockchainState" 

1075 }), 

1076 ) 

1077 

1078 self.assertEqual([call()], self.dongle.get_blockchain_state.call_args_list) 

1079 self.assertFalse(self.dongle.disconnect.called) 

1080 

1081 # Reconnection logic 

1082 self.dongle.get_blockchain_state.side_effect = None 

1083 self.dongle.get_blockchain_state.return_value = { 

1084 "best_block": "the-best-block", 

1085 "newest_valid_block": "the-newest_valid_block", 

1086 "ancestor_block": "the-ancestor-block", 

1087 "ancestor_receipts_root": "the-ancestor-receipts-root", 

1088 "updating.best_block": "the-updating-best-block", 

1089 "updating.newest_valid_block": "the-updating-newest-valid-block", 

1090 "updating.next_expected_block": "the-updating-next-expected-block", 

1091 "updating.total_difficulty": "total-difficulty", 

1092 "updating.in_progress": "is-in-progress", 

1093 "updating.already_validated": "is-already-validated", 

1094 "updating.found_best_block": "have-found-best-block", 

1095 } 

1096 

1097 self.assertEqual( 

1098 { 

1099 "errorcode": 0, 

1100 "state": { 

1101 "best_block": "the-best-block", 

1102 "newest_valid_block": "the-newest_valid_block", 

1103 "ancestor_block": "the-ancestor-block", 

1104 "ancestor_receipts_root": "the-ancestor-receipts-root", 

1105 "updating": { 

1106 "best_block": "the-updating-best-block", 

1107 "newest_valid_block": "the-updating-newest-valid-block", 

1108 "next_expected_block": "the-updating-next-expected-block", 

1109 "total_difficulty": "total-difficulty", 

1110 "in_progress": "is-in-progress", 

1111 "already_validated": "is-already-validated", 

1112 "found_best_block": "have-found-best-block", 

1113 }, 

1114 }, 

1115 }, 

1116 self.protocol.handle_request({ 

1117 "version": 5, 

1118 "command": "blockchainState" 

1119 }), 

1120 ) 

1121 

1122 self._assert_reconnected() 

1123 

1124 def test_reset_advance_blockchain_ok(self): 

1125 self.assertEqual( 

1126 {"errorcode": 0}, 

1127 self.protocol.handle_request({ 

1128 "version": 5, 

1129 "command": "resetAdvanceBlockchain" 

1130 }), 

1131 ) 

1132 

1133 self.assertEqual([call()], self.dongle.reset_advance_blockchain.call_args_list) 

1134 self.assertFalse(self.dongle.disconnect.called) 

1135 

1136 def test_reset_advance_blockchain_dongle_timeout(self): 

1137 self.dongle.reset_advance_blockchain.side_effect = HSM2DongleTimeoutError() 

1138 

1139 self.assertEqual( 

1140 {"errorcode": -905}, 

1141 self.protocol.handle_request({ 

1142 "version": 5, 

1143 "command": "resetAdvanceBlockchain" 

1144 }), 

1145 ) 

1146 

1147 self.assertEqual([call()], self.dongle.reset_advance_blockchain.call_args_list) 

1148 self.assertFalse(self.dongle.disconnect.called) 

1149 

1150 def test_reset_advance_blockchain_dongle_commerror_reconnection(self): 

1151 self.dongle.reset_advance_blockchain.side_effect = HSM2DongleCommError() 

1152 

1153 self.assertEqual( 

1154 {"errorcode": -905}, 

1155 self.protocol.handle_request({ 

1156 "version": 5, 

1157 "command": "resetAdvanceBlockchain" 

1158 }), 

1159 ) 

1160 

1161 self.assertEqual([call()], self.dongle.reset_advance_blockchain.call_args_list) 

1162 self.assertFalse(self.dongle.disconnect.called) 

1163 

1164 # Reconnection logic 

1165 self.dongle.reset_advance_blockchain.side_effect = None 

1166 

1167 self.assertEqual( 

1168 {"errorcode": 0}, 

1169 self.protocol.handle_request({ 

1170 "version": 5, 

1171 "command": "resetAdvanceBlockchain" 

1172 }), 

1173 ) 

1174 

1175 self._assert_reconnected() 

1176 

1177 def test_reset_advance_blockchain_dongle_exception(self): 

1178 self.dongle.reset_advance_blockchain.side_effect = HSM2DongleError("an-error") 

1179 

1180 self.assertEqual( 

1181 {"errorcode": -905}, 

1182 self.protocol.handle_request({ 

1183 "version": 5, 

1184 "command": "resetAdvanceBlockchain" 

1185 }), 

1186 ) 

1187 

1188 self.assertEqual([call()], self.dongle.reset_advance_blockchain.call_args_list) 

1189 self.assertFalse(self.dongle.disconnect.called) 

1190 

1191 @parameterized.expand([ 

1192 ("success", (True, 1), 0), 

1193 ("partial_success", (True, 2), 1), 

1194 ("init", (False, -1), -905), 

1195 ("compute_metadata", (False, -2), -204), 

1196 ("metadata", (False, -3), -905), 

1197 ("block_data", (False, -4), -905), 

1198 ("invalid_block", (False, -5), -204), 

1199 ("pow_invalid", (False, -6), -202), 

1200 ("chaining_mismatch", (False, -7), -201), 

1201 ("unsupported_chain", (False, -8), -204), 

1202 ("invalid_brothers", (False, -9), -205), 

1203 ("unexpected", (False, -10), -906), 

1204 ("unknown", (False, 999), -906), 

1205 ]) 

1206 def test_advance_blockchain_mapping(self, _, response, expected_code): 

1207 self.dongle.advance_blockchain.return_value = response 

1208 self.assertEqual( 

1209 {"errorcode": expected_code}, 

1210 self.protocol.handle_request({ 

1211 "version": 5, 

1212 "command": "advanceBlockchain", 

1213 "blocks": ["aabbcc", "ddeeff"], 

1214 "brothers": [["bb11"], ["bb21", "bb22"]], 

1215 }), 

1216 ) 

1217 

1218 self.assertEqual( 

1219 [call( 

1220 ["aabbcc", "ddeeff"], 

1221 [["bb11"], ["bb21", "bb22"]], 

1222 )], 

1223 self.dongle.advance_blockchain.call_args_list, 

1224 ) 

1225 self.assertFalse(self.dongle.disconnect.called) 

1226 

1227 def test_advance_blockchain_timeout(self): 

1228 self.dongle.advance_blockchain.side_effect = HSM2DongleTimeoutError() 

1229 

1230 self.assertEqual( 

1231 {"errorcode": -905}, 

1232 self.protocol.handle_request({ 

1233 "version": 5, 

1234 "command": "advanceBlockchain", 

1235 "blocks": ["aabbcc", "ddeeff"], 

1236 "brothers": [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1237 }), 

1238 ) 

1239 

1240 self.assertEqual( 

1241 [call( 

1242 ["aabbcc", "ddeeff"], 

1243 [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1244 )], 

1245 self.dongle.advance_blockchain.call_args_list, 

1246 ) 

1247 self.assertFalse(self.dongle.disconnect.called) 

1248 

1249 def test_advance_blockchain_commerror_reconnection(self): 

1250 self.dongle.advance_blockchain.side_effect = HSM2DongleCommError() 

1251 

1252 self.assertEqual( 

1253 {"errorcode": -905}, 

1254 self.protocol.handle_request({ 

1255 "version": 5, 

1256 "command": "advanceBlockchain", 

1257 "blocks": ["aabbcc", "ddeeff"], 

1258 "brothers": [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1259 }), 

1260 ) 

1261 

1262 self.assertEqual( 

1263 [call( 

1264 ["aabbcc", "ddeeff"], 

1265 [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1266 )], 

1267 self.dongle.advance_blockchain.call_args_list, 

1268 ) 

1269 self.assertFalse(self.dongle.disconnect.called) 

1270 

1271 # Reconnection logic 

1272 self.dongle.advance_blockchain.side_effect = None 

1273 self.dongle.advance_blockchain.return_value = (True, 1) 

1274 self.assertEqual( 

1275 {"errorcode": 0}, 

1276 self.protocol.handle_request({ 

1277 "version": 5, 

1278 "command": "advanceBlockchain", 

1279 "blocks": ["aabbcc", "ddeeff"], 

1280 "brothers": [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1281 }), 

1282 ) 

1283 

1284 self._assert_reconnected() 

1285 

1286 def test_advance_blockchain_exception(self): 

1287 self.dongle.advance_blockchain.side_effect = HSM2DongleError("a-message") 

1288 

1289 self.assertEqual( 

1290 {"errorcode": -905}, 

1291 self.protocol.handle_request({ 

1292 "version": 5, 

1293 "command": "advanceBlockchain", 

1294 "blocks": ["aabbcc", "ddeeff"], 

1295 "brothers": [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1296 }), 

1297 ) 

1298 

1299 self.assertEqual( 

1300 [call( 

1301 ["aabbcc", "ddeeff"], 

1302 [["bb11", "bb12", "bb13"], ["bb21", "bb22"]], 

1303 )], 

1304 self.dongle.advance_blockchain.call_args_list, 

1305 ) 

1306 self.assertFalse(self.dongle.disconnect.called) 

1307 

1308 @parameterized.expand([ 

1309 ("success", (True, 1), 0), 

1310 ("init", (False, -1), -905), 

1311 ("compute_metadata", (False, -2), -204), 

1312 ("metadata", (False, -3), -905), 

1313 ("block_data", (False, -4), -905), 

1314 ("invalid_block", (False, -5), -204), 

1315 ("chaining_mismatch", (False, -6), -201), 

1316 ("tip_mismatch", (False, -7), -203), 

1317 ("remove_mm_fields", (False, -8), -204), 

1318 ("unexpected", (False, -10), -906), 

1319 ("unknown", (False, 999), -906), 

1320 ]) 

1321 def test_update_ancestor_mapping(self, _, response, expected_code): 

1322 self.dongle.update_ancestor.return_value = response 

1323 self.assertEqual( 

1324 {"errorcode": expected_code}, 

1325 self.protocol.handle_request({ 

1326 "version": 5, 

1327 "command": "updateAncestorBlock", 

1328 "blocks": ["aabbcc", "ddeeff"], 

1329 }), 

1330 ) 

1331 

1332 self.assertEqual( 

1333 [call(["aabbcc", "ddeeff"])], 

1334 self.dongle.update_ancestor.call_args_list, 

1335 ) 

1336 self.assertFalse(self.dongle.disconnect.called) 

1337 

1338 def test_update_ancestor_timeout(self): 

1339 self.dongle.update_ancestor.side_effect = HSM2DongleTimeoutError() 

1340 

1341 self.assertEqual( 

1342 {"errorcode": -905}, 

1343 self.protocol.handle_request({ 

1344 "version": 5, 

1345 "command": "updateAncestorBlock", 

1346 "blocks": ["aabbcc", "ddeeff"], 

1347 }), 

1348 ) 

1349 

1350 self.assertEqual( 

1351 [call(["aabbcc", "ddeeff"])], 

1352 self.dongle.update_ancestor.call_args_list, 

1353 ) 

1354 self.assertFalse(self.dongle.disconnect.called) 

1355 

1356 def test_update_ancestor_commerror_reconnection(self): 

1357 self.dongle.update_ancestor.side_effect = HSM2DongleCommError() 

1358 

1359 self.assertEqual( 

1360 {"errorcode": -905}, 

1361 self.protocol.handle_request({ 

1362 "version": 5, 

1363 "command": "updateAncestorBlock", 

1364 "blocks": ["aabbcc", "ddeeff"], 

1365 }), 

1366 ) 

1367 

1368 self.assertEqual( 

1369 [call(["aabbcc", "ddeeff"])], 

1370 self.dongle.update_ancestor.call_args_list, 

1371 ) 

1372 self.assertFalse(self.dongle.disconnect.called) 

1373 

1374 # Reconnection logic 

1375 self.dongle.update_ancestor.side_effect = None 

1376 self.dongle.update_ancestor.return_value = (True, 1) 

1377 self.assertEqual( 

1378 {"errorcode": 0}, 

1379 self.protocol.handle_request({ 

1380 "version": 5, 

1381 "command": "updateAncestorBlock", 

1382 "blocks": ["aabbcc", "ddeeff"], 

1383 }), 

1384 ) 

1385 

1386 self._assert_reconnected() 

1387 

1388 def test_update_ancestor_exception(self): 

1389 self.dongle.update_ancestor.side_effect = HSM2DongleError("a-message") 

1390 

1391 self.assertEqual( 

1392 {"errorcode": -905}, 

1393 self.protocol.handle_request({ 

1394 "version": 5, 

1395 "command": "updateAncestorBlock", 

1396 "blocks": ["aabbcc", "ddeeff"], 

1397 }), 

1398 ) 

1399 

1400 self.assertEqual( 

1401 [call(["aabbcc", "ddeeff"])], 

1402 self.dongle.update_ancestor.call_args_list, 

1403 ) 

1404 self.assertFalse(self.dongle.disconnect.called) 

1405 

1406 def test_get_blockchain_parameters_ok(self): 

1407 self.dongle.get_signer_parameters.return_value = HSM2FirmwareParameters( 

1408 0x32, 

1409 "the-checkpoint", 

1410 HSM2FirmwareParameters.Network.MAINNET 

1411 ) 

1412 

1413 self.assertEqual( 

1414 { 

1415 "errorcode": 0, 

1416 "parameters": { 

1417 "checkpoint": "the-checkpoint", 

1418 "minimum_difficulty": 0x32, 

1419 "network": "mainnet", 

1420 }, 

1421 }, 

1422 self.protocol.handle_request({ 

1423 "version": 5, 

1424 "command": "blockchainParameters" 

1425 }), 

1426 ) 

1427 

1428 def test_get_blockchain_parameters_dongle_timeout(self): 

1429 self.dongle.get_signer_parameters.side_effect = HSM2DongleTimeoutError() 

1430 

1431 self.assertEqual( 

1432 {"errorcode": -905}, 

1433 self.protocol.handle_request({ 

1434 "version": 5, 

1435 "command": "blockchainParameters" 

1436 }), 

1437 ) 

1438 

1439 def test_get_blockchain_parameters_exception(self): 

1440 self.dongle.get_signer_parameters.side_effect = HSM2DongleError("a-message") 

1441 

1442 self.assertEqual( 

1443 {"errorcode": -905}, 

1444 self.protocol.handle_request({ 

1445 "version": 5, 

1446 "command": "blockchainParameters" 

1447 }), 

1448 ) 

1449 

1450 def test_signer_heartbeat_ok(self): 

1451 self.dongle.get_signer_heartbeat.side_effect = lambda ud: (True, { 

1452 "pubKey": "66778899", 

1453 "message": "aabbccdd" + ud, 

1454 "signature": Mock(r="this-is-r", s="this-is-s"), 

1455 "tweak": "1122334455", 

1456 }) 

1457 

1458 self.assertEqual( 

1459 { 

1460 "errorcode": 0, 

1461 "pubKey": "66778899", 

1462 "message": "aabbccdd" + "77"*16, 

1463 "tweak": "1122334455", 

1464 "signature": { 

1465 "r": "this-is-r", 

1466 "s": "this-is-s", 

1467 }, 

1468 }, 

1469 self.protocol.handle_request({ 

1470 "version": 5, 

1471 "command": "signerHeartbeat", 

1472 "udValue": "77"*16, 

1473 }), 

1474 ) 

1475 

1476 def test_signer_heartbeat_dongle_error(self): 

1477 self.dongle.get_signer_heartbeat.return_value = (False, ) 

1478 

1479 self.assertEqual( 

1480 {"errorcode": -905}, 

1481 self.protocol.handle_request({ 

1482 "version": 5, 

1483 "command": "signerHeartbeat", 

1484 "udValue": "99"*16, 

1485 }), 

1486 ) 

1487 

1488 def test_signer_heartbeat_dongle_timeout(self): 

1489 self.dongle.get_signer_heartbeat.side_effect = HSM2DongleTimeoutError() 

1490 

1491 self.assertEqual( 

1492 {"errorcode": -905}, 

1493 self.protocol.handle_request({ 

1494 "version": 5, 

1495 "command": "signerHeartbeat", 

1496 "udValue": "11"*16, 

1497 }), 

1498 ) 

1499 

1500 def test_signer_heartbeat_exception(self): 

1501 self.dongle.get_signer_heartbeat.side_effect = HSM2DongleError("a-message") 

1502 

1503 self.assertEqual( 

1504 {"errorcode": -905}, 

1505 self.protocol.handle_request({ 

1506 "version": 5, 

1507 "command": "signerHeartbeat", 

1508 "udValue": "22"*16, 

1509 }), 

1510 ) 

1511 

1512 @patch("time.sleep") 

1513 def test_ui_heartbeat_from_signer_ok(self, sleep_mock): 

1514 self.dongle.get_current_mode.side_effect = [ 

1515 self.dongle.MODE.SIGNER, 

1516 self.dongle.MODE.UI_HEARTBEAT, 

1517 self.dongle.MODE.SIGNER, 

1518 ] 

1519 

1520 self.dongle.exit_app.side_effect = [ 

1521 HSM2DongleCommError(""), 

1522 HSM2DongleCommError(""), 

1523 ] 

1524 

1525 self.dongle.get_ui_heartbeat.side_effect = lambda ud: (True, { 

1526 "pubKey": "66778899", 

1527 "message": "aabbccdd" + ud, 

1528 "signature": Mock(r="this-is-r", s="this-is-s"), 

1529 "tweak": "1122334455", 

1530 }) 

1531 

1532 self.assertEqual( 

1533 { 

1534 "errorcode": 0, 

1535 "pubKey": "66778899", 

1536 "message": "aabbccdd" + "77"*32, 

1537 "tweak": "1122334455", 

1538 "signature": { 

1539 "r": "this-is-r", 

1540 "s": "this-is-s", 

1541 }, 

1542 }, 

1543 self.protocol.handle_request({ 

1544 "version": 5, 

1545 "command": "uiHeartbeat", 

1546 "udValue": "77"*32, 

1547 }), 

1548 ) 

1549 

1550 @patch("time.sleep") 

1551 def test_ui_heartbeat_exit_signer_error(self, sleep_mock): 

1552 self.dongle.get_current_mode.return_value = self.dongle.MODE.SIGNER 

1553 

1554 self.dongle.exit_app.side_effect = HSM2DongleError("") 

1555 

1556 self.assertEqual({"errorcode": -905}, 

1557 self.protocol.handle_request({ 

1558 "version": 5, 

1559 "command": "uiHeartbeat", 

1560 "udValue": "77"*32, 

1561 })) 

1562 

1563 @patch("time.sleep") 

1564 def test_ui_heartbeat_from_signer_no_ui_heartbeat(self, sleep_mock): 

1565 self.dongle.get_current_mode.side_effect = [ 

1566 self.dongle.MODE.SIGNER, 

1567 self.dongle.MODE.BOOTLOADER, 

1568 ] 

1569 

1570 self.dongle.exit_app.side_effect = HSM2DongleCommError("") 

1571 

1572 self.assertEqual({"errorcode": -905}, 

1573 self.protocol.handle_request({ 

1574 "version": 5, 

1575 "command": "uiHeartbeat", 

1576 "udValue": "77"*32, 

1577 })) 

1578 

1579 @patch("time.sleep") 

1580 def test_ui_heartbeat_from_signer_hb_error(self, sleep_mock): 

1581 self.dongle.get_current_mode.side_effect = [ 

1582 self.dongle.MODE.SIGNER, 

1583 self.dongle.MODE.UI_HEARTBEAT, 

1584 ] 

1585 

1586 self.dongle.exit_app.side_effect = HSM2DongleCommError("") 

1587 self.dongle.get_ui_heartbeat.side_effect = HSM2DongleError("") 

1588 

1589 self.assertEqual({"errorcode": -905}, 

1590 self.protocol.handle_request({ 

1591 "version": 5, 

1592 "command": "uiHeartbeat", 

1593 "udValue": "77"*32, 

1594 })) 

1595 

1596 self.assertTrue(self.dongle.get_ui_heartbeat.called) 

1597 

1598 @patch("time.sleep") 

1599 def test_ui_heartbeat_from_signer_hb_error_result(self, sleep_mock): 

1600 self.dongle.get_current_mode.side_effect = [ 

1601 self.dongle.MODE.SIGNER, 

1602 self.dongle.MODE.UI_HEARTBEAT, 

1603 self.dongle.MODE.SIGNER, 

1604 ] 

1605 

1606 self.dongle.exit_app.side_effect = [ 

1607 HSM2DongleCommError(""), 

1608 HSM2DongleCommError(""), 

1609 ] 

1610 

1611 self.dongle.get_ui_heartbeat.return_value = (False, ) 

1612 

1613 self.assertEqual({"errorcode": -905}, 

1614 self.protocol.handle_request({ 

1615 "version": 5, 

1616 "command": "uiHeartbeat", 

1617 "udValue": "77"*32, 

1618 })) 

1619 

1620 self.assertTrue(self.dongle.get_ui_heartbeat.called) 

1621 

1622 @patch("time.sleep") 

1623 def test_ui_heartbeat_from_signer_back_to_signer_error(self, sleep_mock): 

1624 self.dongle.get_current_mode.side_effect = [ 

1625 self.dongle.MODE.SIGNER, 

1626 self.dongle.MODE.UI_HEARTBEAT, 

1627 ] 

1628 

1629 self.dongle.exit_app.side_effect = [ 

1630 HSM2DongleCommError(""), 

1631 HSM2DongleError(""), 

1632 ] 

1633 

1634 self.dongle.get_ui_heartbeat.side_effect = lambda ud: (True, { 

1635 "pubKey": "66778899", 

1636 "message": "aabbccdd" + ud, 

1637 "signature": Mock(r="this-is-r", s="this-is-s"), 

1638 "tweak": "1122334455", 

1639 }) 

1640 

1641 self.assertEqual({"errorcode": -905}, 

1642 self.protocol.handle_request({ 

1643 "version": 5, 

1644 "command": "uiHeartbeat", 

1645 "udValue": "77"*32, 

1646 })) 

1647 

1648 self.assertTrue(self.dongle.get_ui_heartbeat.called) 

1649 

1650 @patch("time.sleep") 

1651 def test_ui_heartbeat_from_signer_no_back_to_signer(self, sleep_mock): 

1652 self.dongle.get_current_mode.side_effect = [ 

1653 self.dongle.MODE.SIGNER, 

1654 self.dongle.MODE.UI_HEARTBEAT, 

1655 self.dongle.MODE.UI_HEARTBEAT, 

1656 ] 

1657 

1658 self.dongle.exit_app.side_effect = [ 

1659 HSM2DongleCommError(""), 

1660 HSM2DongleCommError(""), 

1661 ] 

1662 

1663 self.dongle.get_ui_heartbeat.side_effect = lambda ud: (True, { 

1664 "pubKey": "66778899", 

1665 "message": "aabbccdd" + ud, 

1666 "signature": Mock(r="this-is-r", s="this-is-s"), 

1667 "tweak": "1122334455", 

1668 }) 

1669 

1670 self.assertEqual({"errorcode": -905}, 

1671 self.protocol.handle_request({ 

1672 "version": 5, 

1673 "command": "uiHeartbeat", 

1674 "udValue": "77"*32, 

1675 })) 

1676 

1677 self.assertTrue(self.dongle.get_ui_heartbeat.called) 

1678 

1679 @patch("time.sleep") 

1680 def test_ui_heartbeat_from_invalid_start_mode(self, sleep_mock): 

1681 self.dongle.get_current_mode.return_value = self.dongle.MODE.BOOTLOADER 

1682 

1683 self.assertEqual({"errorcode": -905}, 

1684 self.protocol.handle_request({ 

1685 "version": 5, 

1686 "command": "uiHeartbeat", 

1687 "udValue": "77"*32, 

1688 })) 

1689 

1690 def test_ui_heartbeat_from_hb_ok(self): 

1691 self.dongle.get_current_mode.return_value = self.dongle.MODE.UI_HEARTBEAT 

1692 

1693 self.dongle.get_ui_heartbeat.side_effect = lambda ud: (True, { 

1694 "pubKey": "66778899", 

1695 "message": "aabbccdd" + ud, 

1696 "signature": Mock(r="this-is-r", s="this-is-s"), 

1697 "tweak": "1122334455", 

1698 }) 

1699 

1700 self.assertEqual( 

1701 { 

1702 "errorcode": 0, 

1703 "pubKey": "66778899", 

1704 "message": "aabbccdd" + "77"*32, 

1705 "tweak": "1122334455", 

1706 "signature": { 

1707 "r": "this-is-r", 

1708 "s": "this-is-s", 

1709 }, 

1710 }, 

1711 self.protocol.handle_request({ 

1712 "version": 5, 

1713 "command": "uiHeartbeat", 

1714 "udValue": "77"*32, 

1715 }), 

1716 ) 

1717 

1718 self.assertFalse(self.dongle.exit_app.called) 

1719 

1720 def test_ui_heartbeat_from_hb_hb_error(self): 

1721 self.dongle.get_current_mode.return_value = self.dongle.MODE.UI_HEARTBEAT 

1722 

1723 self.dongle.get_ui_heartbeat.side_effect = HSM2DongleError() 

1724 

1725 self.assertEqual({"errorcode": -905}, 

1726 self.protocol.handle_request({ 

1727 "version": 5, 

1728 "command": "uiHeartbeat", 

1729 "udValue": "77"*32, 

1730 })) 

1731 

1732 self.assertFalse(self.dongle.exit_app.called) 

1733 

1734 def test_ui_heartbeat_from_hb_hb_error_result(self): 

1735 self.dongle.get_current_mode.return_value = self.dongle.MODE.UI_HEARTBEAT 

1736 

1737 self.dongle.get_ui_heartbeat.return_value = (False, ) 

1738 

1739 self.assertEqual({"errorcode": -905}, 

1740 self.protocol.handle_request({ 

1741 "version": 5, 

1742 "command": "uiHeartbeat", 

1743 "udValue": "77"*32, 

1744 })) 

1745 

1746 self.assertFalse(self.dongle.exit_app.called) 

1747 

1748 def _assert_reconnected(self): 

1749 self.assertTrue(self.dongle.disconnect.called) 

1750 self.assertEqual(2, self.dongle.connect.call_count)