Coverage for tests/ledger/test_hsm2dongle.py: 98%

306 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest import TestCase 

24from unittest.mock import Mock, patch, call 

25from parameterized import parameterized 

26from ledger.hsm2dongle import ( 

27 HSM2Dongle, 

28 HSM2DongleError, 

29 HSM2DongleCommError, 

30 HSM2DongleTimeoutError, 

31 HSM2DongleErrorResult, 

32) 

33from sgx.hsm2dongle import HSM2DongleSGX 

34from ledger.version import HSM2FirmwareVersion 

35from ledgerblue.commException import CommException 

36from enum import Enum 

37 

38import logging 

39 

40logging.disable(logging.CRITICAL) 

41 

42 

43class HSM2DongleTestMode(Enum): 

44 Ledger = "ledger" 

45 SGX = "sgx" 

46 

47 

48class TestHSM2DongleBase(TestCase): 

49 DONGLE_EXCHANGE_TIMEOUT = 10 

50 

51 CHUNK_ERROR_MAPPINGS = [ 

52 ("prot_invalid", 0x6B87, -4), 

53 ("rlp_invalid", 0x6B88, -5), 

54 ("block_too_old", 0x6B89, -5), 

55 ("block_too_short", 0x6B8A, -5), 

56 ("parent_hash_invalid", 0x6B8B, -5), 

57 ("block_num_invalid", 0x6B8D, -5), 

58 ("block_diff_invalid", 0x6B8E, -5), 

59 ("umm_root_invalid", 0x6B8F, -5), 

60 ("btc_header_invalid", 0x6B90, -5), 

61 ("merkle_proof_invalid", 0x6B91, -5), 

62 ("btc_cb_txn_invalid", 0x6B92, -6), 

63 ("mm_rlp_len_mismatch", 0x6B93, -5), 

64 ("btc_diff_mismatch", 0x6B94, -6), 

65 ("merkle_proof_mismatch", 0x6B95, -6), 

66 ("mm_hash_mismatch", 0x6B96, -6), 

67 ("merkle_proof_overflow", 0x6B97, -5), 

68 ("cb_txn_overflow", 0x6B98, -5), 

69 ("buffer_overflow", 0x6B99, -5), 

70 ("chain_mismatch", 0x6B9A, -7), 

71 ("total_diff_overflow", 0x6B9B, -8), 

72 ("cb_txn_hash_mismatch", 0x6B9D, -6), 

73 ("brothers_too_many", 0x6B9E, -9), 

74 ("brother_parent_mismatch", 0x6B9F, -9), 

75 ("brother_same_as_block", 0x6BA0, -9), 

76 ("brother_order_invalid", 0x6BA1, -9), 

77 ("unexpected", 0x6BFF, -10), 

78 ("error_response", bytes([0, 0, 0xFF]), -10), 

79 ] 

80 

81 def get_test_mode(self): 

82 return HSM2DongleTestMode.Ledger 

83 

84 @patch("ledger.hsm2dongle_tcp.getDongle") 

85 @patch("ledger.hsm2dongle.getDongle") 

86 def setUp(self, getDongleMock, getDongleTCPMock): 

87 if self.get_test_mode() == HSM2DongleTestMode.Ledger: 

88 self.dongle = Mock() 

89 self.getDongleMock = getDongleMock 

90 self.getDongleMock.return_value = self.dongle 

91 self.hsm2dongle = HSM2Dongle("a-debug-value") 

92 self.getDongleMock.assert_not_called() 

93 self.hsm2dongle.connect() 

94 self.getDongleMock.assert_called_with("a-debug-value") 

95 getDongleTCPMock.assert_not_called() 

96 elif self.get_test_mode() == HSM2DongleTestMode.SGX: 

97 self.dongle = Mock() 

98 self.getDongleMock = getDongleTCPMock 

99 self.getDongleMock.return_value = self.dongle 

100 self.hsm2dongle = HSM2DongleSGX("a-host", 1234, "a-debug-value") 

101 

102 self.getDongleMock.assert_not_called() 

103 self.hsm2dongle.connect() 

104 self.getDongleMock.assert_called_with("a-host", 1234, "a-debug-value") 

105 self.assertEqual(self.hsm2dongle.dongle, self.dongle) 

106 getDongleMock.assert_not_called() 

107 else: 

108 raise RuntimeError(f"Unknown test mode: {self.get_test_mode()}") 

109 

110 def buf(self, size): 

111 return bytes(map(lambda b: b % 256, range(size))) 

112 

113 def parse_exchange_spec(self, spec, stop=None, replace=None): 

114 rqs = [] 

115 rps = [] 

116 rq = True 

117 stopped = False 

118 for line in spec: 

119 delim = ">" if rq else "<" 

120 delim_pos = line.find(delim) 

121 if delim_pos == -1: 

122 raise RuntimeError("Invalid spec prefix") 

123 name = line[:delim_pos].strip() 

124 if name == stop: 

125 if replace is not None: 

126 (rqs if rq else rps).append(replace) 

127 stopped = True 

128 break 

129 (rqs if rq else rps).append( 

130 bytes.fromhex("80" + line[delim_pos+1:].replace(" ", "")) 

131 ) 

132 rq = not rq 

133 

134 if stop is not None and not stopped: 

135 raise RuntimeError(f"Invalid spec parsing: specified stop at '{stop}' " 

136 "but exchange not found") 

137 return {"requests": rqs, "responses": rps} 

138 

139 def spec_to_exchange(self, spec, trim=False): 

140 trim_length = spec[0][-1] if trim else 0 

141 block_size = len(spec[0]) - trim_length 

142 chunk_size = spec[1] 

143 exchanges = [bytes([0, 0, 0x04, chunk_size])]*(block_size//chunk_size) 

144 remaining = block_size - len(exchanges)*chunk_size 

145 exchanges = [bytes([0, 0, 0x03])] + exchanges + \ 

146 [bytes([0, 0, 0x04, remaining])] 

147 

148 # Spec has brothers? 

149 if len(spec) == 3: 

150 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata 

151 if len(spec) == 3 and spec[2] is not None: 

152 brother_count = len(spec[2][0]) 

153 chunk_size = spec[2][1] 

154 for i in range(brother_count): 

155 brother_size = len(spec[2][0][i]) 

156 bro_exchanges = [bytes([0, 0, 0x09, chunk_size])] * \ 

157 (brother_size//chunk_size) 

158 remaining = brother_size - len(bro_exchanges)*chunk_size 

159 exchanges += [bytes([0, 0, 0x08])] + \ 

160 bro_exchanges + \ 

161 [bytes([0, 0, 0x09, remaining])] 

162 

163 return exchanges 

164 

165 def assert_exchange(self, payloads, timeouts=None): 

166 def ensure_cla(bs): 

167 if bs[0] != 0x80: 

168 return bytes([0x80]) + bs 

169 return bs 

170 

171 if timeouts is None: 

172 timeouts = [None]*len(payloads) 

173 calls = list( 

174 map( 

175 lambda z: call( 

176 ensure_cla(bytes(z[0])), 

177 timeout=(z[1] if z[1] is not None else self.DONGLE_EXCHANGE_TIMEOUT), 

178 ), 

179 zip(payloads, timeouts), 

180 )) 

181 

182 self.assertEqual( 

183 len(payloads), 

184 len(self.dongle.exchange.call_args_list), 

185 msg="# of exchanges mismatch", 

186 ) 

187 

188 for i, c in enumerate(calls): 

189 if c != self.dongle.exchange.call_args_list[i]: 

190 print("E:", c) 

191 print("A:", self.dongle.exchange.call_args_list[i]) 

192 self.assertEqual( 

193 c, 

194 self.dongle.exchange.call_args_list[i], 

195 msg="%dth exchange failed" % (i + 1), 

196 ) 

197 

198 def do_sign_auth(self, spec): 

199 return self.hsm2dongle.sign_authorized( 

200 key_id=spec["keyid"], 

201 rsk_tx_receipt=spec["receipt"], 

202 receipt_merkle_proof=spec["mp"], 

203 btc_tx=spec["tx"], 

204 input_index=spec["input"], 

205 sighash_computation_mode=spec["mode"], 

206 witness_script=spec["ws"], 

207 outpoint_value=spec["ov"], 

208 ) 

209 

210 def process_sign_auth_spec(self, spec, stop=None, replace=None): 

211 pex = self.parse_exchange_spec(spec["exchanges"], stop=stop, replace=replace) 

212 spec["requests"] = pex["requests"] 

213 spec["responses"] = pex["responses"] 

214 self.dongle.exchange.side_effect = spec["responses"] 

215 return spec 

216 

217 

218class TestHSM2Dongle(TestHSM2DongleBase): 

219 def test_dongle_error_codes(self): 

220 # Make sure enums are ok wrt signer definitions by testing a couple 

221 # of arbitrary values 

222 self.assertEqual(0x6B8C, self.hsm2dongle.ERR.ADVANCE.RECEIPT_ROOT_INVALID.value) 

223 self.assertEqual(0x6B93, self.hsm2dongle.ERR.ADVANCE.MM_RLP_LEN_MISMATCH.value) 

224 self.assertEqual(0x6BA1, self.hsm2dongle.ERR.ADVANCE.BROTHER_ORDER_INVALID.value) 

225 self.assertEqual(0x6A8F, self.hsm2dongle.ERR.SIGN.INVALID_PATH) 

226 self.assertEqual( 

227 0x6A97, 

228 self.hsm2dongle.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE.value 

229 ) 

230 

231 def test_connects_ok(self): 

232 self.assertEqual([call("a-debug-value")], self.getDongleMock.call_args_list) 

233 

234 @patch("ledger.hsm2dongle.getDongle") 

235 def test_connects_error_comm(self, getDongleMock): 

236 getDongleMock.side_effect = CommException("a-message") 

237 with self.assertRaises(HSM2DongleCommError): 

238 self.hsm2dongle.connect() 

239 

240 @patch("ledger.hsm2dongle.getDongle") 

241 def test_connects_error_other(self, getDongleMock): 

242 getDongleMock.side_effect = ValueError() 

243 with self.assertRaises(ValueError): 

244 self.hsm2dongle.connect() 

245 

246 def test_get_current_mode(self): 

247 self.dongle.exchange.return_value = bytes([10, 2, 30]) 

248 mode = self.hsm2dongle.get_current_mode() 

249 self.assertEqual(2, mode) 

250 self.assertEqual(self.hsm2dongle.MODE, type(mode)) 

251 self.assert_exchange([[0x43]]) 

252 

253 def test_echo(self): 

254 self.dongle.exchange.return_value = bytes([0x80, 0x02, 0x41, 0x42, 0x43]) 

255 self.assertTrue(self.hsm2dongle.echo()) 

256 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]]) 

257 

258 def test_echo_error(self): 

259 self.dongle.exchange.return_value = bytes([1, 2, 3]) 

260 self.assertFalse(self.hsm2dongle.echo()) 

261 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]]) 

262 

263 def test_is_onboarded_yes(self): 

264 self.dongle.exchange.return_value = bytes([0, 1, 0]) 

265 self.assertTrue(self.hsm2dongle.is_onboarded()) 

266 self.assert_exchange([[0x06]]) 

267 

268 def test_is_onboarded_no(self): 

269 self.dongle.exchange.return_value = bytes([0, 0, 0]) 

270 self.assertFalse(self.hsm2dongle.is_onboarded()) 

271 self.assert_exchange([[0x06]]) 

272 

273 def test_onboard_ok(self): 

274 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 2, 0])] 

275 

276 self.assertTrue( 

277 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")) 

278 

279 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

280 pin_exchanges = [[0x41, 0, 4]] + list( 

281 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4))) 

282 exchanges = seed_exchanges + pin_exchanges + [[0x07]] 

283 timeouts = [None]*len(exchanges) 

284 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT 

285 self.assert_exchange(exchanges, timeouts) 

286 

287 def test_onboard_wipe_error(self): 

288 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 1, 0])] 

289 

290 with self.assertRaises(HSM2DongleError): 

291 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

292 

293 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

294 pin_exchanges = [[0x41, 0, 4]] + list( 

295 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4))) 

296 exchanges = seed_exchanges + pin_exchanges + [[0x07]] 

297 timeouts = [None]*len(exchanges) 

298 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT 

299 self.assert_exchange(exchanges, timeouts) 

300 

301 def test_onboard_pin_error(self): 

302 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 3) + [ 

303 CommException("an-error") 

304 ] 

305 

306 with self.assertRaises(HSM2DongleError): 

307 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

308 

309 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

310 pin_exchanges = [[0x41, 0, 4]] + list( 

311 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(3))) 

312 exchanges = seed_exchanges + pin_exchanges 

313 self.assert_exchange(exchanges) 

314 

315 def test_onboard_seed_error(self): 

316 self.dongle.exchange.side_effect = [bytes([0])]*30 + [CommException("an-error")] 

317 

318 with self.assertRaises(HSM2DongleError): 

319 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

320 

321 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(31))) 

322 self.assert_exchange(seed_exchanges) 

323 

324 def test_unlock_ok(self): 

325 self.dongle.exchange.side_effect = [ 

326 bytes([0]), 

327 bytes([1]), 

328 bytes([2]), 

329 bytes([0, 0, 1]), 

330 ] 

331 self.assertTrue(self.hsm2dongle.unlock(bytes([1, 2, 3]))) 

332 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3], 

333 [0xFE, 0x00, 0x00]]) 

334 

335 def test_unlock_pinerror(self): 

336 self.dongle.exchange.side_effect = [ 

337 bytes([0]), 

338 bytes([1]), 

339 bytes([2]), 

340 bytes([0, 0, 0]), 

341 ] 

342 self.assertFalse(self.hsm2dongle.unlock(bytes([1, 2, 3]))) 

343 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3], 

344 [0xFE, 0x00, 0x00]]) 

345 

346 def test_new_pin(self): 

347 self.dongle.exchange.side_effect = [ 

348 bytes([0]), 

349 bytes([1]), 

350 bytes([2]), 

351 bytes([3]), 

352 bytes([4]), 

353 ] 

354 self.hsm2dongle.new_pin(bytes([4, 5, 6])) 

355 self.assert_exchange([[0x41, 0, 3], [0x41, 1, 4], [0x41, 2, 5], [0x41, 3, 6], 

356 [0x08]]) 

357 

358 def test_version(self): 

359 self.dongle.exchange.return_value = bytes([0, 0, 6, 7, 8]) 

360 version = self.hsm2dongle.get_version() 

361 self.assertEqual(HSM2FirmwareVersion, type(version)) 

362 self.assertEqual(6, version.major) 

363 self.assertEqual(7, version.minor) 

364 self.assertEqual(8, version.patch) 

365 self.assert_exchange([[0x06]]) 

366 

367 def test_retries(self): 

368 self.dongle.exchange.return_value = bytes([0, 0, 57]) 

369 retries = self.hsm2dongle.get_retries() 

370 self.assertEqual(57, retries) 

371 self.assert_exchange([[0x45]]) 

372 

373 def test_exit_menu(self): 

374 self.dongle.exchange.return_value = bytes([0]) 

375 self.hsm2dongle.exit_menu() 

376 self.assert_exchange([[0xFF, 0x00, 0x00]]) 

377 

378 def test_exit_menu_explicit_autoexec(self): 

379 self.dongle.exchange.return_value = bytes([0]) 

380 self.hsm2dongle.exit_menu(autoexec=True) 

381 self.assert_exchange([[0xFF, 0x00, 0x00]]) 

382 

383 def test_exit_menu_no_autoexec(self): 

384 self.dongle.exchange.return_value = bytes([0]) 

385 self.hsm2dongle.exit_menu(autoexec=False) 

386 self.assert_exchange([[0xFA, 0x00, 0x00]]) 

387 

388 def test_exit_app(self): 

389 self.dongle.exchange.side_effect = OSError("read error") 

390 with self.assertRaises(HSM2DongleCommError): 

391 self.hsm2dongle.exit_app() 

392 self.assert_exchange([[0xFF]]) 

393 

394 def test_get_public_key_ok(self): 

395 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

396 self.dongle.exchange.return_value = bytes.fromhex("aabbccddee") 

397 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id)) 

398 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

399 

400 def test_get_public_key_invalid_keyid(self): 

401 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

402 self.dongle.exchange.side_effect = CommException("some message", 0x6A87) 

403 with self.assertRaises(HSM2DongleErrorResult): 

404 self.hsm2dongle.get_public_key(key_id) 

405 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

406 

407 def test_get_public_key_timeout(self): 

408 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

409 self.dongle.exchange.side_effect = CommException("Timeout") 

410 with self.assertRaises(HSM2DongleTimeoutError): 

411 self.hsm2dongle.get_public_key(key_id) 

412 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

413 

414 def test_get_public_key_other_error(self): 

415 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

416 self.dongle.exchange.side_effect = CommException("some other message", 0xFFFF) 

417 with self.assertRaises(HSM2DongleError): 

418 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id)) 

419 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

420 

421 

422class TestHSM2DongleSignUnauthorized(TestHSM2DongleBase): 

423 @patch("ledger.hsm2dongle.HSM2DongleSignature") 

424 def test_sign_unauthorized_ok(self, HSM2DongleSignatureMock): 

425 HSM2DongleSignatureMock.return_value = "the-signature" 

426 self.dongle.exchange.side_effect = [ 

427 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash 

428 ] 

429 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

430 self.assertEqual( 

431 (True, "the-signature"), 

432 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

433 ) 

434 

435 self.assert_exchange([ 

436 [ 

437 0x02, 

438 0x01, 

439 0x11, 

440 0x22, 

441 0x33, 

442 0x44, 

443 0xAA, 

444 0xBB, 

445 0xCC, 

446 0xDD, 

447 0xEE, 

448 0xFF, 

449 ], # Path and hash 

450 ]) 

451 self.assertEqual( 

452 [call(bytes([0x55, 0x66, 0x77, 0x88]))], 

453 HSM2DongleSignatureMock.call_args_list, 

454 ) 

455 

456 @patch("ledger.hsm2dongle.HSM2DongleSignature") 

457 def test_sign_unauthorized_invalid_signature(self, HSM2DongleSignatureMock): 

458 HSM2DongleSignatureMock.side_effect = ValueError() 

459 self.dongle.exchange.side_effect = [ 

460 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash 

461 ] 

462 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

463 self.assertEqual( 

464 (False, -10), 

465 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

466 ) 

467 

468 self.assert_exchange([ 

469 [ 

470 0x02, 

471 0x01, 

472 0x11, 

473 0x22, 

474 0x33, 

475 0x44, 

476 0xAA, 

477 0xBB, 

478 0xCC, 

479 0xDD, 

480 0xEE, 

481 0xFF, 

482 ], # Path and hash 

483 ]) 

484 self.assertEqual( 

485 [call(bytes([0x55, 0x66, 0x77, 0x88]))], 

486 HSM2DongleSignatureMock.call_args_list, 

487 ) 

488 

489 @parameterized.expand([ 

490 ("data_size", 0x6A87, -5), 

491 ("data_size_noauth", 0x6A91, -5), 

492 ("invalid_path", 0x6A8F, -1), 

493 ("data_size_auth", 0x6A90, -1), 

494 ("unknown", 0x6AFF, -10), 

495 ("btc_tx", [0, 0, 0x02], -5), 

496 ("unexpected", [0, 0, 0xAA], -10), 

497 ]) 

498 def test_sign_unauthorized_dongle_error_result(self, _, device_error, 

499 expected_response): 

500 if type(device_error) == int: 

501 last_exchange = CommException("msg", device_error) 

502 else: 

503 last_exchange = bytes(device_error) 

504 self.dongle.exchange.side_effect = [last_exchange] # Response to path and hash 

505 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

506 self.assertEqual( 

507 (False, expected_response), 

508 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

509 ) 

510 

511 self.assert_exchange([ 

512 [ 

513 0x02, 

514 0x01, 

515 0x11, 

516 0x22, 

517 0x33, 

518 0x44, 

519 0xAA, 

520 0xBB, 

521 0xCC, 

522 0xDD, 

523 0xEE, 

524 0xFF, 

525 ], # Path and hash 

526 ]) 

527 

528 def test_sign_unauthorized_invalid_hash(self): 

529 self.assertEqual( 

530 (False, -5), 

531 self.hsm2dongle.sign_unauthorized(key_id="doesn't matter", hash="not-a-hex"), 

532 ) 

533 

534 self.assertFalse(self.dongle.exchange.called) 

535 

536 

537class TestHSM2DongleBlockchainState(TestHSM2DongleBase): 

538 def test_get_blockchain_state_ok(self): 

539 self.dongle.exchange.side_effect = [ 

540 bytes([0, 0, 0x01, 0x01]) + 

541 bytes.fromhex("11"*32), # Response to get best_block 

542 bytes([0, 0, 0x01, 0x02]) + 

543 bytes.fromhex("22"*32), # Response to get newest_valid_block 

544 bytes([0, 0, 0x01, 0x03]) + 

545 bytes.fromhex("33"*32), # Response to get ancestor_block 

546 bytes([0, 0, 0x01, 0x05]) + 

547 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

548 bytes([0, 0, 0x01, 0x81]) + 

549 bytes.fromhex("55"*32), # Response to get updating.best_block 

550 bytes([0, 0, 0x01, 0x82]) + 

551 bytes.fromhex("66"*32), # Response to get updating.newest_valid_block 

552 bytes([0, 0, 0x01, 0x84]) + 

553 bytes.fromhex("77"*32), # Response to get updating.next_expected_block 

554 bytes([0, 0, 0x02]) + 

555 bytes.fromhex("112233445566"), # Response to get difficulty 

556 bytes([0, 0, 0x03, 0x00, 0xFF, 0xFF]), # Response to get flags 

557 ] 

558 self.assertEqual( 

559 { 

560 "best_block": 

561 "11"*32, 

562 "newest_valid_block": 

563 "22"*32, 

564 "ancestor_block": 

565 "33"*32, 

566 "ancestor_receipts_root": 

567 "44"*32, 

568 "updating.best_block": 

569 "55"*32, 

570 "updating.newest_valid_block": 

571 "66"*32, 

572 "updating.next_expected_block": 

573 "77"*32, 

574 "updating.total_difficulty": 

575 int.from_bytes( 

576 bytes.fromhex("112233445566"), byteorder="big", signed=False), 

577 "updating.in_progress": 

578 False, 

579 "updating.already_validated": 

580 True, 

581 "updating.found_best_block": 

582 True, 

583 }, 

584 self.hsm2dongle.get_blockchain_state(), 

585 ) 

586 

587 self.assert_exchange([ 

588 [0x20, 0x01, 0x01], 

589 [0x20, 0x01, 0x02], 

590 [0x20, 0x01, 0x03], 

591 [0x20, 0x01, 0x05], 

592 [0x20, 0x01, 0x81], 

593 [0x20, 0x01, 0x82], 

594 [0x20, 0x01, 0x84], 

595 [0x20, 0x02], 

596 [0x20, 0x03], 

597 ]) 

598 

599 def test_get_blockchain_state_error_hash(self): 

600 self.dongle.exchange.side_effect = [ 

601 bytes([0, 0, 0x01, 0x01]) + 

602 bytes.fromhex("11"*32), # Response to get best_block 

603 bytes([0, 0, 0x01, 0x02]) + 

604 bytes.fromhex("22"*32), # Response to get newest_valid_block 

605 bytes([0, 0, 0x01, 0x03]) + 

606 bytes.fromhex("33"*32), # Response to get ancestor_block 

607 bytes([0, 0, 0x01, 0x05]) + 

608 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

609 bytes([0, 0, 0xAA]), # Response to get updating.best_block 

610 ] 

611 

612 with self.assertRaises(HSM2DongleError): 

613 self.hsm2dongle.get_blockchain_state() 

614 

615 self.assert_exchange([ 

616 [0x20, 0x01, 0x01], 

617 [0x20, 0x01, 0x02], 

618 [0x20, 0x01, 0x03], 

619 [0x20, 0x01, 0x05], 

620 [0x20, 0x01, 0x81], 

621 ]) 

622 

623 def test_get_blockchain_state_error_difficulty(self): 

624 self.dongle.exchange.side_effect = [ 

625 bytes([0, 0, 0x01, 0x01]) + 

626 bytes.fromhex("11"*32), # Response to get best_block 

627 bytes([0, 0, 0x01, 0x02]) + 

628 bytes.fromhex("22"*32), # Response to get newest_valid_block 

629 bytes([0, 0, 0x01, 0x03]) + 

630 bytes.fromhex("33"*32), # Response to get ancestor_block 

631 bytes([0, 0, 0x01, 0x05]) + 

632 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

633 bytes([0, 0, 0x01, 0x81]) + 

634 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root 

635 bytes([0, 0, 0x01, 0x82]) + 

636 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root 

637 bytes([0, 0, 0x01, 0x84]) + 

638 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root 

639 CommException("a-message"), 

640 ] 

641 

642 with self.assertRaises(HSM2DongleError): 

643 self.hsm2dongle.get_blockchain_state() 

644 

645 self.assert_exchange([ 

646 [0x20, 0x01, 0x01], 

647 [0x20, 0x01, 0x02], 

648 [0x20, 0x01, 0x03], 

649 [0x20, 0x01, 0x05], 

650 [0x20, 0x01, 0x81], 

651 [0x20, 0x01, 0x82], 

652 [0x20, 0x01, 0x84], 

653 [0x20, 0x02], 

654 ]) 

655 

656 def test_get_blockchain_state_error_flags(self): 

657 self.dongle.exchange.side_effect = [ 

658 bytes([0, 0, 0x01, 0x01]) + 

659 bytes.fromhex("11"*32), # Response to get best_block 

660 bytes([0, 0, 0x01, 0x02]) + 

661 bytes.fromhex("22"*32), # Response to get newest_valid_block 

662 bytes([0, 0, 0x01, 0x03]) + 

663 bytes.fromhex("33"*32), # Response to get ancestor_block 

664 bytes([0, 0, 0x01, 0x05]) + 

665 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

666 bytes([0, 0, 0x01, 0x81]) + 

667 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root 

668 bytes([0, 0, 0x01, 0x82]) + 

669 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root 

670 bytes([0, 0, 0x01, 0x84]) + 

671 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root 

672 bytes([0, 0, 0x02, 0xFF]), # Response to get difficulty 

673 bytes([0, 0, 0x04]), # Response to get flags 

674 ] 

675 

676 with self.assertRaises(HSM2DongleError): 

677 self.hsm2dongle.get_blockchain_state() 

678 

679 self.assert_exchange([ 

680 [0x20, 0x01, 0x01], 

681 [0x20, 0x01, 0x02], 

682 [0x20, 0x01, 0x03], 

683 [0x20, 0x01, 0x05], 

684 [0x20, 0x01, 0x81], 

685 [0x20, 0x01, 0x82], 

686 [0x20, 0x01, 0x84], 

687 [0x20, 0x02], 

688 [0x20, 0x03], 

689 ]) 

690 

691 def test_reset_advance_blockchain_ok(self): 

692 self.dongle.exchange.side_effect = [ 

693 bytes([0, 0, 0x02]), # Response 

694 ] 

695 self.assertTrue(self.hsm2dongle.reset_advance_blockchain()) 

696 

697 self.assert_exchange([ 

698 [0x21, 0x01], 

699 ]) 

700 

701 def test_reset_advance_blockchain_invalid_response(self): 

702 self.dongle.exchange.side_effect = [ 

703 bytes([0, 0, 0xAA]), # Response 

704 ] 

705 with self.assertRaises(HSM2DongleError): 

706 self.hsm2dongle.reset_advance_blockchain() 

707 

708 self.assert_exchange([ 

709 [0x21, 0x01], 

710 ]) 

711 

712 def test_reset_advance_blockchain_exception(self): 

713 self.dongle.exchange.side_effect = [CommException("a-message")] 

714 with self.assertRaises(HSM2DongleError): 

715 self.hsm2dongle.reset_advance_blockchain() 

716 

717 self.assert_exchange([ 

718 [0x21, 0x01], 

719 ])