Coverage for tests/ledger/test_hsm2dongle.py: 99%

427 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-05 20:41 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest import TestCase 

24from unittest.mock import Mock, patch, call 

25from parameterized import parameterized 

26from ledger.hsm2dongle import ( 

27 HSM2Dongle, 

28 HSM2DongleError, 

29 HSM2DongleCommError, 

30 HSM2DongleTimeoutError, 

31 HSM2DongleErrorResult, 

32) 

33from ledger.version import HSM2FirmwareVersion 

34from ledgerblue.commException import CommException 

35 

36import logging 

37 

38logging.disable(logging.CRITICAL) 

39 

40 

41class TestHSM2DongleBase(TestCase): 

42 DONGLE_EXCHANGE_TIMEOUT = 10 

43 

44 CHUNK_ERROR_MAPPINGS = [ 

45 ("prot_invalid", 0x6B87, -4), 

46 ("rlp_invalid", 0x6B88, -5), 

47 ("block_too_old", 0x6B89, -5), 

48 ("block_too_short", 0x6B8A, -5), 

49 ("parent_hash_invalid", 0x6B8B, -5), 

50 ("block_num_invalid", 0x6B8D, -5), 

51 ("block_diff_invalid", 0x6B8E, -5), 

52 ("umm_root_invalid", 0x6B8F, -5), 

53 ("btc_header_invalid", 0x6B90, -5), 

54 ("merkle_proof_invalid", 0x6B91, -5), 

55 ("btc_cb_txn_invalid", 0x6B92, -6), 

56 ("mm_rlp_len_mismatch", 0x6B93, -5), 

57 ("btc_diff_mismatch", 0x6B94, -6), 

58 ("merkle_proof_mismatch", 0x6B95, -6), 

59 ("mm_hash_mismatch", 0x6B96, -6), 

60 ("merkle_proof_overflow", 0x6B97, -5), 

61 ("cb_txn_overflow", 0x6B98, -5), 

62 ("buffer_overflow", 0x6B99, -5), 

63 ("chain_mismatch", 0x6B9A, -7), 

64 ("total_diff_overflow", 0x6B9B, -8), 

65 ("cb_txn_hash_mismatch", 0x6B9D, -6), 

66 ("brothers_too_many", 0x6B9E, -9), 

67 ("brother_parent_mismatch", 0x6B9F, -9), 

68 ("brother_same_as_block", 0x6BA0, -9), 

69 ("brother_order_invalid", 0x6BA1, -9), 

70 ("unexpected", 0x6BFF, -10), 

71 ("error_response", bytes([0, 0, 0xFF]), -10), 

72 ] 

73 

74 @patch("ledger.hsm2dongle.getDongle") 

75 def setUp(self, getDongleMock): 

76 self.dongle = Mock() 

77 self.getDongleMock = getDongleMock 

78 self.getDongleMock.return_value = self.dongle 

79 self.hsm2dongle = HSM2Dongle("a-debug-value") 

80 self.hsm2dongle.connect() 

81 

82 def buf(self, size): 

83 return bytes(map(lambda b: b % 256, range(size))) 

84 

85 def parse_exchange_spec(self, spec, stop=None, replace=None): 

86 rqs = [] 

87 rps = [] 

88 rq = True 

89 stopped = False 

90 for line in spec: 

91 delim = ">" if rq else "<" 

92 delim_pos = line.find(delim) 

93 if delim_pos == -1: 

94 raise RuntimeError("Invalid spec prefix") 

95 name = line[:delim_pos].strip() 

96 if name == stop: 

97 if replace is not None: 

98 (rqs if rq else rps).append(replace) 

99 stopped = True 

100 break 

101 (rqs if rq else rps).append( 

102 bytes.fromhex("80" + line[delim_pos+1:].replace(" ", "")) 

103 ) 

104 rq = not rq 

105 

106 if stop is not None and not stopped: 

107 raise RuntimeError(f"Invalid spec parsing: specified stop at '{stop}' " 

108 "but exchange not found") 

109 return {"requests": rqs, "responses": rps} 

110 

111 def spec_to_exchange(self, spec, trim=False): 

112 trim_length = spec[0][-1] if trim else 0 

113 block_size = len(spec[0]) - trim_length 

114 chunk_size = spec[1] 

115 exchanges = [bytes([0, 0, 0x04, chunk_size])]*(block_size//chunk_size) 

116 remaining = block_size - len(exchanges)*chunk_size 

117 exchanges = [bytes([0, 0, 0x03])] + exchanges + \ 

118 [bytes([0, 0, 0x04, remaining])] 

119 

120 # Spec has brothers? 

121 if len(spec) == 3: 

122 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata 

123 if len(spec) == 3 and spec[2] is not None: 

124 brother_count = len(spec[2][0]) 

125 chunk_size = spec[2][1] 

126 for i in range(brother_count): 

127 brother_size = len(spec[2][0][i]) 

128 bro_exchanges = [bytes([0, 0, 0x09, chunk_size])] * \ 

129 (brother_size//chunk_size) 

130 remaining = brother_size - len(bro_exchanges)*chunk_size 

131 exchanges += [bytes([0, 0, 0x08])] + \ 

132 bro_exchanges + \ 

133 [bytes([0, 0, 0x09, remaining])] 

134 

135 return exchanges 

136 

137 def assert_exchange(self, payloads, timeouts=None): 

138 def ensure_cla(bs): 

139 if bs[0] != 0x80: 

140 return bytes([0x80]) + bs 

141 return bs 

142 

143 if timeouts is None: 

144 timeouts = [None]*len(payloads) 

145 calls = list( 

146 map( 

147 lambda z: call( 

148 ensure_cla(bytes(z[0])), 

149 timeout=(z[1] if z[1] is not None else self.DONGLE_EXCHANGE_TIMEOUT), 

150 ), 

151 zip(payloads, timeouts), 

152 )) 

153 

154 self.assertEqual( 

155 len(payloads), 

156 len(self.dongle.exchange.call_args_list), 

157 msg="# of exchanges mismatch", 

158 ) 

159 

160 for i, c in enumerate(calls): 

161 if c != self.dongle.exchange.call_args_list[i]: 

162 print("E:", c) 

163 print("A:", self.dongle.exchange.call_args_list[i]) 

164 self.assertEqual( 

165 c, 

166 self.dongle.exchange.call_args_list[i], 

167 msg="%dth exchange failed" % (i + 1), 

168 ) 

169 

170 def do_sign_auth(self, spec): 

171 return self.hsm2dongle.sign_authorized( 

172 key_id=spec["keyid"], 

173 rsk_tx_receipt=spec["receipt"], 

174 receipt_merkle_proof=spec["mp"], 

175 btc_tx=spec["tx"], 

176 input_index=spec["input"], 

177 sighash_computation_mode=spec["mode"], 

178 witness_script=spec["ws"], 

179 outpoint_value=spec["ov"], 

180 ) 

181 

182 def process_sign_auth_spec(self, spec, stop=None, replace=None): 

183 pex = self.parse_exchange_spec(spec["exchanges"], stop=stop, replace=replace) 

184 spec["requests"] = pex["requests"] 

185 spec["responses"] = pex["responses"] 

186 self.dongle.exchange.side_effect = spec["responses"] 

187 return spec 

188 

189 

190class TestHSM2Dongle(TestHSM2DongleBase): 

191 def test_dongle_error_codes(self): 

192 # Make sure enums are ok wrt signer definitions by testing a couple 

193 # of arbitrary values 

194 self.assertEqual(0x6B8C, self.hsm2dongle.ERR.ADVANCE.RECEIPT_ROOT_INVALID.value) 

195 self.assertEqual(0x6B93, self.hsm2dongle.ERR.ADVANCE.MM_RLP_LEN_MISMATCH.value) 

196 self.assertEqual(0x6BA1, self.hsm2dongle.ERR.ADVANCE.BROTHER_ORDER_INVALID.value) 

197 self.assertEqual(0x6A8F, self.hsm2dongle.ERR.SIGN.INVALID_PATH) 

198 self.assertEqual( 

199 0x6A97, 

200 self.hsm2dongle.ERR.SIGN.INVALID_SIGHASH_COMPUTATION_MODE.value 

201 ) 

202 

203 def test_connects_ok(self): 

204 self.assertEqual([call("a-debug-value")], self.getDongleMock.call_args_list) 

205 

206 @patch("ledger.hsm2dongle.getDongle") 

207 def test_connects_error_comm(self, getDongleMock): 

208 getDongleMock.side_effect = CommException("a-message") 

209 with self.assertRaises(HSM2DongleCommError): 

210 self.hsm2dongle.connect() 

211 

212 @patch("ledger.hsm2dongle.getDongle") 

213 def test_connects_error_other(self, getDongleMock): 

214 getDongleMock.side_effect = ValueError() 

215 with self.assertRaises(ValueError): 

216 self.hsm2dongle.connect() 

217 

218 def test_get_current_mode(self): 

219 self.dongle.exchange.return_value = bytes([10, 2, 30]) 

220 mode = self.hsm2dongle.get_current_mode() 

221 self.assertEqual(2, mode) 

222 self.assertEqual(self.hsm2dongle.MODE, type(mode)) 

223 self.assert_exchange([[0x43]]) 

224 

225 def test_echo(self): 

226 self.dongle.exchange.return_value = bytes([0x80, 0x02, 0x41, 0x42, 0x43]) 

227 self.assertTrue(self.hsm2dongle.echo()) 

228 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]]) 

229 

230 def test_echo_error(self): 

231 self.dongle.exchange.return_value = bytes([1, 2, 3]) 

232 self.assertFalse(self.hsm2dongle.echo()) 

233 self.assert_exchange([[0x02, 0x41, 0x42, 0x43]]) 

234 

235 def test_is_onboarded_yes(self): 

236 self.dongle.exchange.return_value = bytes([0, 1, 0]) 

237 self.assertTrue(self.hsm2dongle.is_onboarded()) 

238 self.assert_exchange([[0x06]]) 

239 

240 def test_is_onboarded_no(self): 

241 self.dongle.exchange.return_value = bytes([0, 0, 0]) 

242 self.assertFalse(self.hsm2dongle.is_onboarded()) 

243 self.assert_exchange([[0x06]]) 

244 

245 def test_onboard_ok(self): 

246 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 2, 0])] 

247 

248 self.assertTrue( 

249 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234")) 

250 

251 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

252 pin_exchanges = [[0x41, 0, 4]] + list( 

253 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4))) 

254 exchanges = seed_exchanges + pin_exchanges + [[0x07]] 

255 timeouts = [None]*len(exchanges) 

256 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT 

257 self.assert_exchange(exchanges, timeouts) 

258 

259 def test_onboard_wipe_error(self): 

260 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 5) + [bytes([0, 1, 0])] 

261 

262 with self.assertRaises(HSM2DongleError): 

263 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

264 

265 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

266 pin_exchanges = [[0x41, 0, 4]] + list( 

267 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(4))) 

268 exchanges = seed_exchanges + pin_exchanges + [[0x07]] 

269 timeouts = [None]*len(exchanges) 

270 timeouts[-1] = HSM2Dongle.ONBOARDING.TIMEOUT 

271 self.assert_exchange(exchanges, timeouts) 

272 

273 def test_onboard_pin_error(self): 

274 self.dongle.exchange.side_effect = [bytes([0])]*(32 + 3) + [ 

275 CommException("an-error") 

276 ] 

277 

278 with self.assertRaises(HSM2DongleError): 

279 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

280 

281 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(32))) 

282 pin_exchanges = [[0x41, 0, 4]] + list( 

283 map(lambda i: [0x41, i + 1, ord(str(i + 1))], range(3))) 

284 exchanges = seed_exchanges + pin_exchanges 

285 self.assert_exchange(exchanges) 

286 

287 def test_onboard_seed_error(self): 

288 self.dongle.exchange.side_effect = [bytes([0])]*30 + [CommException("an-error")] 

289 

290 with self.assertRaises(HSM2DongleError): 

291 self.hsm2dongle.onboard(bytes(map(lambda i: i*2, range(32))), b"1234") 

292 

293 seed_exchanges = list(map(lambda i: [0x44, i, i*2], range(31))) 

294 self.assert_exchange(seed_exchanges) 

295 

296 def test_unlock_ok(self): 

297 self.dongle.exchange.side_effect = [ 

298 bytes([0]), 

299 bytes([1]), 

300 bytes([2]), 

301 bytes([0, 0, 1]), 

302 ] 

303 self.assertTrue(self.hsm2dongle.unlock(bytes([1, 2, 3]))) 

304 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3], 

305 [0xFE, 0x00, 0x00]]) 

306 

307 def test_unlock_pinerror(self): 

308 self.dongle.exchange.side_effect = [ 

309 bytes([0]), 

310 bytes([1]), 

311 bytes([2]), 

312 bytes([0, 0, 0]), 

313 ] 

314 self.assertFalse(self.hsm2dongle.unlock(bytes([1, 2, 3]))) 

315 self.assert_exchange([[0x41, 0, 1], [0x41, 1, 2], [0x41, 2, 3], 

316 [0xFE, 0x00, 0x00]]) 

317 

318 def test_new_pin(self): 

319 self.dongle.exchange.side_effect = [ 

320 bytes([0]), 

321 bytes([1]), 

322 bytes([2]), 

323 bytes([3]), 

324 bytes([4]), 

325 ] 

326 self.hsm2dongle.new_pin(bytes([4, 5, 6])) 

327 self.assert_exchange([[0x41, 0, 3], [0x41, 1, 4], [0x41, 2, 5], [0x41, 3, 6], 

328 [0x08]]) 

329 

330 def test_version(self): 

331 self.dongle.exchange.return_value = bytes([0, 0, 6, 7, 8]) 

332 version = self.hsm2dongle.get_version() 

333 self.assertEqual(HSM2FirmwareVersion, type(version)) 

334 self.assertEqual(6, version.major) 

335 self.assertEqual(7, version.minor) 

336 self.assertEqual(8, version.patch) 

337 self.assert_exchange([[0x06]]) 

338 

339 def test_retries(self): 

340 self.dongle.exchange.return_value = bytes([0, 0, 57]) 

341 retries = self.hsm2dongle.get_retries() 

342 self.assertEqual(57, retries) 

343 self.assert_exchange([[0x45]]) 

344 

345 def test_exit_menu(self): 

346 self.dongle.exchange.return_value = bytes([0]) 

347 self.hsm2dongle.exit_menu() 

348 self.assert_exchange([[0xFF, 0x00, 0x00]]) 

349 

350 def test_exit_menu_explicit_autoexec(self): 

351 self.dongle.exchange.return_value = bytes([0]) 

352 self.hsm2dongle.exit_menu(autoexec=True) 

353 self.assert_exchange([[0xFF, 0x00, 0x00]]) 

354 

355 def test_exit_menu_no_autoexec(self): 

356 self.dongle.exchange.return_value = bytes([0]) 

357 self.hsm2dongle.exit_menu(autoexec=False) 

358 self.assert_exchange([[0xFA, 0x00, 0x00]]) 

359 

360 def test_exit_app(self): 

361 self.dongle.exchange.side_effect = OSError("read error") 

362 with self.assertRaises(HSM2DongleCommError): 

363 self.hsm2dongle.exit_app() 

364 self.assert_exchange([[0xFF]]) 

365 

366 def test_get_public_key_ok(self): 

367 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

368 self.dongle.exchange.return_value = bytes.fromhex("aabbccddee") 

369 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id)) 

370 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

371 

372 def test_get_public_key_invalid_keyid(self): 

373 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

374 self.dongle.exchange.side_effect = CommException("some message", 0x6A87) 

375 with self.assertRaises(HSM2DongleErrorResult): 

376 self.hsm2dongle.get_public_key(key_id) 

377 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

378 

379 def test_get_public_key_timeout(self): 

380 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

381 self.dongle.exchange.side_effect = CommException("Timeout") 

382 with self.assertRaises(HSM2DongleTimeoutError): 

383 self.hsm2dongle.get_public_key(key_id) 

384 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

385 

386 def test_get_public_key_other_error(self): 

387 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

388 self.dongle.exchange.side_effect = CommException("some other message", 0xFFFF) 

389 with self.assertRaises(HSM2DongleError): 

390 self.assertEqual("aabbccddee", self.hsm2dongle.get_public_key(key_id)) 

391 self.assert_exchange([[0x04, 0x11, 0x22, 0x33, 0x44]]) 

392 

393 

394class TestHSM2DongleSignUnauthorized(TestHSM2DongleBase): 

395 @patch("ledger.hsm2dongle.HSM2DongleSignature") 

396 def test_sign_unauthorized_ok(self, HSM2DongleSignatureMock): 

397 HSM2DongleSignatureMock.return_value = "the-signature" 

398 self.dongle.exchange.side_effect = [ 

399 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash 

400 ] 

401 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

402 self.assertEqual( 

403 (True, "the-signature"), 

404 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

405 ) 

406 

407 self.assert_exchange([ 

408 [ 

409 0x02, 

410 0x01, 

411 0x11, 

412 0x22, 

413 0x33, 

414 0x44, 

415 0xAA, 

416 0xBB, 

417 0xCC, 

418 0xDD, 

419 0xEE, 

420 0xFF, 

421 ], # Path and hash 

422 ]) 

423 self.assertEqual( 

424 [call(bytes([0x55, 0x66, 0x77, 0x88]))], 

425 HSM2DongleSignatureMock.call_args_list, 

426 ) 

427 

428 @patch("ledger.hsm2dongle.HSM2DongleSignature") 

429 def test_sign_unauthorized_invalid_signature(self, HSM2DongleSignatureMock): 

430 HSM2DongleSignatureMock.side_effect = ValueError() 

431 self.dongle.exchange.side_effect = [ 

432 bytes([0, 0, 0x81, 0x55, 0x66, 0x77, 0x88]), # Response to path and hash 

433 ] 

434 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

435 self.assertEqual( 

436 (False, -10), 

437 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

438 ) 

439 

440 self.assert_exchange([ 

441 [ 

442 0x02, 

443 0x01, 

444 0x11, 

445 0x22, 

446 0x33, 

447 0x44, 

448 0xAA, 

449 0xBB, 

450 0xCC, 

451 0xDD, 

452 0xEE, 

453 0xFF, 

454 ], # Path and hash 

455 ]) 

456 self.assertEqual( 

457 [call(bytes([0x55, 0x66, 0x77, 0x88]))], 

458 HSM2DongleSignatureMock.call_args_list, 

459 ) 

460 

461 @parameterized.expand([ 

462 ("data_size", 0x6A87, -5), 

463 ("data_size_noauth", 0x6A91, -5), 

464 ("invalid_path", 0x6A8F, -1), 

465 ("data_size_auth", 0x6A90, -1), 

466 ("unknown", 0x6AFF, -10), 

467 ("btc_tx", [0, 0, 0x02], -5), 

468 ("unexpected", [0, 0, 0xAA], -10), 

469 ]) 

470 def test_sign_unauthorized_dongle_error_result(self, _, device_error, 

471 expected_response): 

472 if type(device_error) == int: 

473 last_exchange = CommException("msg", device_error) 

474 else: 

475 last_exchange = bytes(device_error) 

476 self.dongle.exchange.side_effect = [last_exchange] # Response to path and hash 

477 key_id = Mock(**{"to_binary.return_value": bytes.fromhex("11223344")}) 

478 self.assertEqual( 

479 (False, expected_response), 

480 self.hsm2dongle.sign_unauthorized(key_id=key_id, hash="aabbccddeeff"), 

481 ) 

482 

483 self.assert_exchange([ 

484 [ 

485 0x02, 

486 0x01, 

487 0x11, 

488 0x22, 

489 0x33, 

490 0x44, 

491 0xAA, 

492 0xBB, 

493 0xCC, 

494 0xDD, 

495 0xEE, 

496 0xFF, 

497 ], # Path and hash 

498 ]) 

499 

500 def test_sign_unauthorized_invalid_hash(self): 

501 self.assertEqual( 

502 (False, -5), 

503 self.hsm2dongle.sign_unauthorized(key_id="doesn't matter", hash="not-a-hex"), 

504 ) 

505 

506 self.assertFalse(self.dongle.exchange.called) 

507 

508 

509class TestHSM2DongleBlockchainState(TestHSM2DongleBase): 

510 def test_get_blockchain_state_ok(self): 

511 self.dongle.exchange.side_effect = [ 

512 bytes([0, 0, 0x01, 0x01]) + 

513 bytes.fromhex("11"*32), # Response to get best_block 

514 bytes([0, 0, 0x01, 0x02]) + 

515 bytes.fromhex("22"*32), # Response to get newest_valid_block 

516 bytes([0, 0, 0x01, 0x03]) + 

517 bytes.fromhex("33"*32), # Response to get ancestor_block 

518 bytes([0, 0, 0x01, 0x05]) + 

519 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

520 bytes([0, 0, 0x01, 0x81]) + 

521 bytes.fromhex("55"*32), # Response to get updating.best_block 

522 bytes([0, 0, 0x01, 0x82]) + 

523 bytes.fromhex("66"*32), # Response to get updating.newest_valid_block 

524 bytes([0, 0, 0x01, 0x84]) + 

525 bytes.fromhex("77"*32), # Response to get updating.next_expected_block 

526 bytes([0, 0, 0x02]) + 

527 bytes.fromhex("112233445566"), # Response to get difficulty 

528 bytes([0, 0, 0x03, 0x00, 0xFF, 0xFF]), # Response to get flags 

529 ] 

530 self.assertEqual( 

531 { 

532 "best_block": 

533 "11"*32, 

534 "newest_valid_block": 

535 "22"*32, 

536 "ancestor_block": 

537 "33"*32, 

538 "ancestor_receipts_root": 

539 "44"*32, 

540 "updating.best_block": 

541 "55"*32, 

542 "updating.newest_valid_block": 

543 "66"*32, 

544 "updating.next_expected_block": 

545 "77"*32, 

546 "updating.total_difficulty": 

547 int.from_bytes( 

548 bytes.fromhex("112233445566"), byteorder="big", signed=False), 

549 "updating.in_progress": 

550 False, 

551 "updating.already_validated": 

552 True, 

553 "updating.found_best_block": 

554 True, 

555 }, 

556 self.hsm2dongle.get_blockchain_state(), 

557 ) 

558 

559 self.assert_exchange([ 

560 [0x20, 0x01, 0x01], 

561 [0x20, 0x01, 0x02], 

562 [0x20, 0x01, 0x03], 

563 [0x20, 0x01, 0x05], 

564 [0x20, 0x01, 0x81], 

565 [0x20, 0x01, 0x82], 

566 [0x20, 0x01, 0x84], 

567 [0x20, 0x02], 

568 [0x20, 0x03], 

569 ]) 

570 

571 def test_get_blockchain_state_error_hash(self): 

572 self.dongle.exchange.side_effect = [ 

573 bytes([0, 0, 0x01, 0x01]) + 

574 bytes.fromhex("11"*32), # Response to get best_block 

575 bytes([0, 0, 0x01, 0x02]) + 

576 bytes.fromhex("22"*32), # Response to get newest_valid_block 

577 bytes([0, 0, 0x01, 0x03]) + 

578 bytes.fromhex("33"*32), # Response to get ancestor_block 

579 bytes([0, 0, 0x01, 0x05]) + 

580 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

581 bytes([0, 0, 0xAA]), # Response to get updating.best_block 

582 ] 

583 

584 with self.assertRaises(HSM2DongleError): 

585 self.hsm2dongle.get_blockchain_state() 

586 

587 self.assert_exchange([ 

588 [0x20, 0x01, 0x01], 

589 [0x20, 0x01, 0x02], 

590 [0x20, 0x01, 0x03], 

591 [0x20, 0x01, 0x05], 

592 [0x20, 0x01, 0x81], 

593 ]) 

594 

595 def test_get_blockchain_state_error_difficulty(self): 

596 self.dongle.exchange.side_effect = [ 

597 bytes([0, 0, 0x01, 0x01]) + 

598 bytes.fromhex("11"*32), # Response to get best_block 

599 bytes([0, 0, 0x01, 0x02]) + 

600 bytes.fromhex("22"*32), # Response to get newest_valid_block 

601 bytes([0, 0, 0x01, 0x03]) + 

602 bytes.fromhex("33"*32), # Response to get ancestor_block 

603 bytes([0, 0, 0x01, 0x05]) + 

604 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

605 bytes([0, 0, 0x01, 0x81]) + 

606 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root 

607 bytes([0, 0, 0x01, 0x82]) + 

608 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root 

609 bytes([0, 0, 0x01, 0x84]) + 

610 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root 

611 CommException("a-message"), 

612 ] 

613 

614 with self.assertRaises(HSM2DongleError): 

615 self.hsm2dongle.get_blockchain_state() 

616 

617 self.assert_exchange([ 

618 [0x20, 0x01, 0x01], 

619 [0x20, 0x01, 0x02], 

620 [0x20, 0x01, 0x03], 

621 [0x20, 0x01, 0x05], 

622 [0x20, 0x01, 0x81], 

623 [0x20, 0x01, 0x82], 

624 [0x20, 0x01, 0x84], 

625 [0x20, 0x02], 

626 ]) 

627 

628 def test_get_blockchain_state_error_flags(self): 

629 self.dongle.exchange.side_effect = [ 

630 bytes([0, 0, 0x01, 0x01]) + 

631 bytes.fromhex("11"*32), # Response to get best_block 

632 bytes([0, 0, 0x01, 0x02]) + 

633 bytes.fromhex("22"*32), # Response to get newest_valid_block 

634 bytes([0, 0, 0x01, 0x03]) + 

635 bytes.fromhex("33"*32), # Response to get ancestor_block 

636 bytes([0, 0, 0x01, 0x05]) + 

637 bytes.fromhex("44"*32), # Response to get ancestor_receipts_root 

638 bytes([0, 0, 0x01, 0x81]) + 

639 bytes.fromhex("55"*32), # Response to get ancestor_receipts_root 

640 bytes([0, 0, 0x01, 0x82]) + 

641 bytes.fromhex("66"*32), # Response to get ancestor_receipts_root 

642 bytes([0, 0, 0x01, 0x84]) + 

643 bytes.fromhex("77"*32), # Response to get ancestor_receipts_root 

644 bytes([0, 0, 0x02, 0xFF]), # Response to get difficulty 

645 bytes([0, 0, 0x04]), # Response to get flags 

646 ] 

647 

648 with self.assertRaises(HSM2DongleError): 

649 self.hsm2dongle.get_blockchain_state() 

650 

651 self.assert_exchange([ 

652 [0x20, 0x01, 0x01], 

653 [0x20, 0x01, 0x02], 

654 [0x20, 0x01, 0x03], 

655 [0x20, 0x01, 0x05], 

656 [0x20, 0x01, 0x81], 

657 [0x20, 0x01, 0x82], 

658 [0x20, 0x01, 0x84], 

659 [0x20, 0x02], 

660 [0x20, 0x03], 

661 ]) 

662 

663 def test_reset_advance_blockchain_ok(self): 

664 self.dongle.exchange.side_effect = [ 

665 bytes([0, 0, 0x02]), # Response 

666 ] 

667 self.assertTrue(self.hsm2dongle.reset_advance_blockchain()) 

668 

669 self.assert_exchange([ 

670 [0x21, 0x01], 

671 ]) 

672 

673 def test_reset_advance_blockchain_invalid_response(self): 

674 self.dongle.exchange.side_effect = [ 

675 bytes([0, 0, 0xAA]), # Response 

676 ] 

677 with self.assertRaises(HSM2DongleError): 

678 self.hsm2dongle.reset_advance_blockchain() 

679 

680 self.assert_exchange([ 

681 [0x21, 0x01], 

682 ]) 

683 

684 def test_reset_advance_blockchain_exception(self): 

685 self.dongle.exchange.side_effect = [CommException("a-message")] 

686 with self.assertRaises(HSM2DongleError): 

687 self.hsm2dongle.reset_advance_blockchain() 

688 

689 self.assert_exchange([ 

690 [0x21, 0x01], 

691 ]) 

692 

693 

694class TestHSM2DongleAdvanceBlockchain(TestHSM2DongleBase): 

695 def setup_mocks(self, 

696 mmplsize_mock, 

697 get_cb_txn_mock, 

698 cb_txn_get_hash_mock, 

699 gbh_mock): 

700 mmplsize_mock.side_effect = lambda h: len(h)//8 

701 get_cb_txn_mock.side_effect = lambda h: {"cb_txn": h} 

702 cb_txn_get_hash_mock.side_effect = lambda h: \ 

703 (bytes([len(h["cb_txn"])//5])*4).hex() 

704 gbh_mock.return_value = "00" 

705 

706 @parameterized.expand([ 

707 ("partial_v2.0.x", 0x05, 2), 

708 ("total_v2.0.x", 0x06, 1), 

709 ("partial_v2.1.x", 0x05, 2), 

710 ("total_v2.1.x", 0x06, 1), 

711 ]) 

712 @patch("ledger.hsm2dongle.get_block_hash") 

713 @patch("ledger.hsm2dongle.coinbase_tx_get_hash") 

714 @patch("ledger.hsm2dongle.get_coinbase_txn") 

715 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

716 def test_advance_blockchain_ok( 

717 self, 

718 _, 

719 device_response, 

720 expected_response, 

721 mmplsize_mock, 

722 get_cb_txn_mock, 

723 cb_txn_get_hash_mock, 

724 gbh_mock, 

725 ): 

726 self.setup_mocks(mmplsize_mock, 

727 get_cb_txn_mock, 

728 cb_txn_get_hash_mock, 

729 gbh_mock) 

730 brothers_spec = [ 

731 # (brother list of brother bytes, chunk size) 

732 ([self.buf(190), self.buf(100)], 90), 

733 None, # 2nd block has no brothers 

734 ([self.buf(130)], 60), 

735 ] 

736 blocks_spec = [ 

737 # (block bytes, chunk size, brothers) 

738 (self.buf(300), 80, brothers_spec[0]), 

739 (self.buf(250), 100, brothers_spec[1]), 

740 (self.buf(140), 50, brothers_spec[2]), 

741 ] 

742 

743 self.dongle.exchange.side_effect = [ 

744 bs for excs in map(self.spec_to_exchange, blocks_spec) 

745 for bs in excs 

746 ] + [bytes([0, 0, device_response])] # Success response 

747 

748 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

749 brothers_list = list(map( 

750 lambda bs: list(map( 

751 lambda b: b.hex(), bs[0])) if bs else [], 

752 brothers_spec)) 

753 self.assertEqual( 

754 (True, expected_response), 

755 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list), 

756 ) 

757 

758 self.assert_exchange([ 

759 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

760 [0x10, 0x03, 0x00, 0x4B] + 

761 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta 

762 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk 

763 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk 

764 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk 

765 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk 

766 [0x10, 0x07, 0x02], # Blk #1 brother count 

767 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta 

768 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk 

769 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk 

770 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk 

771 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta 

772 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk 

773 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk 

774 [0x10, 0x03, 0x00, 0x3E] + 

775 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta 

776 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk 

777 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk 

778 [0x10, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Blk #2 chunk 

779 [0x10, 0x07, 0x00], # Blk #2 brother count 

780 [0x10, 0x03, 0x00, 0x23] + 

781 [0x38, 0x38, 0x38, 0x38], # Blk #3 meta 

782 [0x10, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Blk #3 chunk 

783 [0x10, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Blk #3 chunk 

784 [0x10, 0x04] + list(blocks_spec[2][0][50*2:50*3]), # Blk #3 chunk 

785 [0x10, 0x07, 0x01], # Blk #3 brother count 

786 [0x10, 0x08, 0x00, 0x20, 0x34, 0x34, 0x34, 0x34], # Blk #3 bro #1 meta 

787 [0x10, 0x09] + list(brothers_spec[2][0][0][60*0:60*1]), # Blk #3 bro #1 chunk 

788 [0x10, 0x09] + list(brothers_spec[2][0][0][60*1:60*2]), # Blk #3 bro #1 chunk 

789 [0x10, 0x09] + list(brothers_spec[2][0][0][60*2:60*3]), # Blk #3 bro #1 chunk 

790 ]) 

791 

792 @parameterized.expand(TestHSM2DongleBase.CHUNK_ERROR_MAPPINGS) 

793 @patch("ledger.hsm2dongle.get_block_hash") 

794 @patch("ledger.hsm2dongle.coinbase_tx_get_hash") 

795 @patch("ledger.hsm2dongle.get_coinbase_txn") 

796 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

797 def test_advance_blockchain_chunk_error_result( 

798 self, 

799 _, 

800 error_code, 

801 response, 

802 mmplsize_mock, 

803 get_cb_txn_mock, 

804 cb_txn_get_hash_mock, 

805 gbh_mock, 

806 ): 

807 self.setup_mocks(mmplsize_mock, 

808 get_cb_txn_mock, 

809 cb_txn_get_hash_mock, 

810 gbh_mock) 

811 brothers_spec = [ 

812 # (brother list of brother bytes, chunk size) 

813 ([self.buf(190), self.buf(100)], 90), 

814 None, # 2nd block has no brothers 

815 ([self.buf(130)], 60), 

816 ] 

817 blocks_spec = [ 

818 # (block bytes, chunk size, brothers) 

819 (self.buf(300), 80, brothers_spec[0]), 

820 (self.buf(250), 100, brothers_spec[1]), 

821 (self.buf(140), 50, brothers_spec[2]), 

822 ] 

823 

824 side_effect = [ 

825 bs for excs in map(self.spec_to_exchange, blocks_spec) 

826 for bs in excs 

827 ] 

828 

829 # Make the second chunk of the second block fail 

830 # First block meta & chunks & bro metas & chunks 

831 # + second block meta & first & second chunk 

832 exchange_index = ( 

833 (1 + 300//80 + 1) + 1 + (1 + 190//90 + 1) + (1 + 100//90 + 1) + 3 

834 ) 

835 

836 if type(error_code) == bytes: 

837 side_effect[exchange_index] = error_code 

838 else: 

839 side_effect[exchange_index] = CommException("a-message", error_code) 

840 side_effect = side_effect[:exchange_index + 1] 

841 self.dongle.exchange.side_effect = side_effect 

842 

843 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

844 brothers_list = list(map( 

845 lambda bs: list(map( 

846 lambda b: b.hex(), bs[0])) if bs else [], 

847 brothers_spec)) 

848 

849 self.assertEqual( 

850 (False, response), 

851 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list), 

852 ) 

853 

854 self.assert_exchange([ 

855 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

856 [0x10, 0x03, 0x00, 0x4B] + 

857 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta 

858 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk 

859 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk 

860 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk 

861 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk 

862 [0x10, 0x07, 0x02], # Blk #1 brother count 

863 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta 

864 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk 

865 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk 

866 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk 

867 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta 

868 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk 

869 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk 

870 [0x10, 0x03, 0x00, 0x3E] + 

871 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta 

872 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk 

873 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk 

874 ]) 

875 

876 @parameterized.expand([ 

877 ("prot_invalid", 0x6B87, -3), 

878 ("unexpected", 0x6BFF, -10), 

879 ("error_response", bytes([0, 0, 0xFF]), -10), 

880 ]) 

881 @patch("ledger.hsm2dongle.get_block_hash") 

882 @patch("ledger.hsm2dongle.coinbase_tx_get_hash") 

883 @patch("ledger.hsm2dongle.get_coinbase_txn") 

884 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

885 def test_advance_blockchain_metadata_error_result( 

886 self, 

887 _, 

888 error_code, 

889 response, 

890 mmplsize_mock, 

891 get_cb_txn_mock, 

892 cb_txn_get_hash_mock, 

893 gbh_mock, 

894 ): 

895 self.setup_mocks(mmplsize_mock, 

896 get_cb_txn_mock, 

897 cb_txn_get_hash_mock, 

898 gbh_mock) 

899 brothers_spec = [ 

900 # (brother list of brother bytes, chunk size) 

901 ([self.buf(190), self.buf(100)], 90), 

902 None, # 2nd block has no brothers 

903 ([self.buf(130)], 60), 

904 ] 

905 blocks_spec = [ 

906 # (block bytes, chunk size, brothers) 

907 (self.buf(300), 80, brothers_spec[0]), 

908 (self.buf(250), 100, brothers_spec[1]), 

909 (self.buf(140), 50, brothers_spec[2]), 

910 ] 

911 

912 side_effect = [ 

913 bs for excs in map(self.spec_to_exchange, blocks_spec) 

914 for bs in excs 

915 ] 

916 

917 # Make the metadata of the third block fail 

918 # First block meta & chunks & bro metas & chunks 

919 # + second block meta & chunks & bro meta 

920 # + third block meta 

921 exchange_index = ( 

922 (1 + 300//80 + 1) + 1 + (1 + 190//90 + 1) + (1 + 100//90 + 1) + 

923 (1 + 250//100 + 1) + 1 + 

924 1 

925 ) 

926 

927 if type(error_code) == bytes: 

928 side_effect[exchange_index] = error_code 

929 else: 

930 side_effect[exchange_index] = CommException("a-message", error_code) 

931 side_effect = side_effect[:exchange_index + 1] 

932 self.dongle.exchange.side_effect = side_effect 

933 

934 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

935 

936 brothers_list = list(map( 

937 lambda bs: list(map( 

938 lambda b: b.hex(), bs[0])) if bs else [], 

939 brothers_spec)) 

940 

941 self.assertEqual( 

942 (False, response), 

943 self.hsm2dongle.advance_blockchain(blocks_hex, brothers_list), 

944 ) 

945 

946 self.assert_exchange([ 

947 [0x10, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

948 [0x10, 0x03, 0x00, 0x4B] + 

949 [0x78, 0x78, 0x78, 0x78], # Blk #1 meta 

950 [0x10, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Blk #1 chunk 

951 [0x10, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Blk #1 chunk 

952 [0x10, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Blk #1 chunk 

953 [0x10, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Blk #1 chunk 

954 [0x10, 0x07, 0x02], # Blk #1 brother count 

955 [0x10, 0x08, 0x00, 0x2f, 0x4c, 0x4c, 0x4c, 0x4c], # Blk #1 bro #1 meta 

956 [0x10, 0x09] + list(brothers_spec[0][0][0][90*0:90*1]), # Blk #1 bro #1 chunk 

957 [0x10, 0x09] + list(brothers_spec[0][0][0][90*1:90*2]), # Blk #1 bro #1 chunk 

958 [0x10, 0x09] + list(brothers_spec[0][0][0][90*2:90*3]), # Blk #1 bro #1 chunk 

959 [0x10, 0x08, 0x00, 0x19, 0x28, 0x28, 0x28, 0x28], # Blk #1 bro #2 meta 

960 [0x10, 0x09] + list(brothers_spec[0][0][1][90*0:90*1]), # Blk #1 bro #2 chunk 

961 [0x10, 0x09] + list(brothers_spec[0][0][1][90*1:90*2]), # Blk #1 bro #2 chunk 

962 [0x10, 0x03, 0x00, 0x3E] + 

963 [0x64, 0x64, 0x64, 0x64], # Blk #2 meta 

964 [0x10, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Blk #2 chunk 

965 [0x10, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Blk #2 chunk 

966 [0x10, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Blk #2 chunk 

967 [0x10, 0x07, 0x00], # Blk #2 brother count 

968 [0x10, 0x03, 0x00, 0x23] + 

969 [0x38, 0x38, 0x38, 0x38], # Blk #3 meta 

970 ]) 

971 

972 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

973 def test_advance_blockchain_metadata_error_generating(self, mmplsize_mock): 

974 mmplsize_mock.side_effect = ValueError() 

975 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])] 

976 

977 self.assertEqual( 

978 (False, -2), 

979 self.hsm2dongle.advance_blockchain(["first-block", "second-block"], 

980 [[], []]), 

981 ) 

982 

983 self.assert_exchange([ 

984 [0x10, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

985 ]) 

986 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list) 

987 

988 @parameterized.expand([ 

989 ("prot_invalid", CommException("a-message", 0x6B87), -1), 

990 ("unexpected", CommException("a-message", 0x6BFF), -10), 

991 ("invalid_response", bytes([0, 0, 0xFF]), -10), 

992 ]) 

993 def test_advance_blockchain_init_error(self, _, error, response): 

994 self.dongle.exchange.side_effect = [error] 

995 

996 self.assertEqual( 

997 (False, response), 

998 self.hsm2dongle.advance_blockchain(["first-block", "second-block"], 

999 [[], []]), 

1000 ) 

1001 

1002 self.assert_exchange([ 

1003 [0x10, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

1004 ]) 

1005 

1006 

1007class TestHSM2DongleUpdateAncestor(TestHSM2DongleBase): 

1008 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1009 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

1010 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock): 

1011 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2] 

1012 mmplsize_mock.side_effect = lambda h: len(h)//8 

1013 blocks_spec = [ 

1014 # (block bytes, chunk size) 

1015 ( 

1016 self.buf(300) + 

1017 bytes.fromhex("aabbccddeeff0011220a"), 

1018 80, 

1019 ), 

1020 ( 

1021 self.buf(250) + 

1022 bytes.fromhex("1122334405"), 

1023 100, 

1024 ), 

1025 ( 

1026 self.buf(130) + 

1027 bytes.fromhex("334455aabbccdd2211982311aacdfe10"), 

1028 50, 

1029 ), 

1030 ] 

1031 

1032 self.dongle.exchange.side_effect = [ 

1033 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec) 

1034 for bs in excs 

1035 ] + [bytes([0, 0, 0x05])] # Success response 

1036 

1037 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

1038 self.assertEqual((True, 1), 

1039 self.hsm2dongle.update_ancestor(blocks_hex)) 

1040 

1041 self.assert_exchange([ 

1042 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

1043 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

1044 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

1045 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

1046 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

1047 [0x30, 0x04] + 

1048 list(blocks_spec[0][0][80*3:80*4][:-blocks_spec[0][0][-1]]), # Block #1 chunk 

1049 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

1050 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

1051 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

1052 [0x30, 0x04] + 

1053 list(blocks_spec[1][0][100*2:100 * 

1054 3][:-blocks_spec[1][0][-1]]), # Block #2 chunk 

1055 [0x30, 0x03, 0x00, 0x20], # Block #3 meta 

1056 [0x30, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Block #2 chunk 

1057 [0x30, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Block #3 chunk 

1058 [0x30, 0x04] + 

1059 list(blocks_spec[2][0][50*2:50*3][:-blocks_spec[2][0][-1]]), # Block #3 chunk 

1060 ]) 

1061 

1062 @parameterized.expand([ 

1063 ("prot_invalid", 0x6B87, -4), 

1064 ("rlp_invalid", 0x6B88, -5), 

1065 ("block_too_old", 0x6B89, -5), 

1066 ("block_too_short", 0x6B8A, -5), 

1067 ("parent_hash_invalid", 0x6B8B, -5), 

1068 ("receipt_root_invalid", 0x6B8C, -5), 

1069 ("block_num_invalid", 0x6B8D, -5), 

1070 ("btc_header_invalid", 0x6B90, -5), 

1071 ("mm_rlp_len_mismatch", 0x6B93, -5), 

1072 ("buffer_overflow", 0x6B99, -5), 

1073 ("chain_mismatch", 0x6B9A, -6), 

1074 ("ancestor_tip_mismatch", 0x6B9C, -7), 

1075 ("unexpected", 0x6BFF, -10), 

1076 ("error_response", bytes([0, 0, 0xFF]), -10), 

1077 ]) 

1078 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1079 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

1080 def test_update_ancestor_chunk_error_result(self, _, error_code, response, 

1081 mmplsize_mock, rmvflds_mock): 

1082 rmvflds_mock.side_effect = lambda h: h 

1083 mmplsize_mock.side_effect = lambda h: len(h)//8 

1084 blocks_spec = [ 

1085 # (block bytes, chunk size) 

1086 (self.buf(300), 80), 

1087 (self.buf(250), 100), 

1088 (self.buf(140), 50), 

1089 ] 

1090 

1091 side_effect = [ 

1092 bs for excs in map(self.spec_to_exchange, blocks_spec) 

1093 for bs in excs 

1094 ] 

1095 # Make the second chunk of the second block fail 

1096 exchange_index = ( 

1097 1 + (300//80 + 2) + 2 

1098 ) # Init + first block meta & chunks + second block meta & first chunk 

1099 if type(error_code) == bytes: 

1100 side_effect[exchange_index] = error_code 

1101 else: 

1102 side_effect[exchange_index] = CommException("a-message", error_code) 

1103 side_effect = side_effect[:exchange_index + 1] 

1104 self.dongle.exchange.side_effect = side_effect 

1105 

1106 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

1107 self.assertEqual( 

1108 (False, response), 

1109 self.hsm2dongle.update_ancestor(blocks_hex), 

1110 ) 

1111 

1112 self.assert_exchange([ 

1113 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

1114 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

1115 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

1116 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

1117 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

1118 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk 

1119 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

1120 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

1121 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

1122 ]) 

1123 

1124 @parameterized.expand([ 

1125 ("prot_invalid", 0x6B87, -3), 

1126 ("unexpected", 0x6BFF, -10), 

1127 ("error_response", bytes([0, 0, 0xFF]), -10), 

1128 ]) 

1129 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1130 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

1131 def test_update_ancestor_metadata_error_result(self, _, error_code, response, 

1132 mmplsize_mock, rmvflds_mock): 

1133 rmvflds_mock.side_effect = lambda h: h 

1134 mmplsize_mock.side_effect = lambda h: len(h)//8 

1135 blocks_spec = [ 

1136 # (block bytes, chunk size) 

1137 (self.buf(300), 80), 

1138 (self.buf(250), 100), 

1139 (self.buf(140), 50), 

1140 ] 

1141 

1142 side_effect = [ 

1143 bs for excs in map(self.spec_to_exchange, blocks_spec) 

1144 for bs in excs 

1145 ] 

1146 # Make the metadata of the third block fail 

1147 exchange_index = ( 

1148 1 + (300//80 + 2) + (250//100 + 2) 

1149 ) # Init + first and second block meta & chunks + third block meta 

1150 if type(error_code) == bytes: 

1151 side_effect[exchange_index] = error_code 

1152 else: 

1153 side_effect[exchange_index] = CommException("a-message", error_code) 

1154 side_effect = side_effect[:exchange_index + 1] 

1155 self.dongle.exchange.side_effect = side_effect 

1156 

1157 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

1158 self.assertEqual( 

1159 (False, response), 

1160 self.hsm2dongle.update_ancestor(blocks_hex), 

1161 ) 

1162 

1163 self.assert_exchange([ 

1164 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

1165 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

1166 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

1167 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

1168 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

1169 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk 

1170 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

1171 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

1172 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

1173 [0x30, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Block #2 chunk 

1174 [0x30, 0x03, 0x00, 0x23], # Block #3 meta 

1175 ]) 

1176 

1177 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1178 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

1179 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock): 

1180 rmvflds_mock.side_effect = lambda h: h 

1181 mmplsize_mock.side_effect = ValueError() 

1182 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])] 

1183 

1184 self.assertEqual( 

1185 (False, -2), 

1186 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

1187 ) 

1188 

1189 self.assert_exchange([ 

1190 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

1191 ]) 

1192 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list) 

1193 

1194 @parameterized.expand([ 

1195 ("prot_invalid", CommException("a-message", 0x6B87), -1), 

1196 ("unexpected", CommException("a-message", 0x6BFF), -10), 

1197 ("invalid_response", bytes([0, 0, 0xFF]), -10), 

1198 ]) 

1199 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1200 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock): 

1201 rmvflds_mock.side_effect = lambda h: h 

1202 self.dongle.exchange.side_effect = [error] 

1203 

1204 self.assertEqual( 

1205 (False, response), 

1206 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

1207 ) 

1208 

1209 self.assert_exchange([ 

1210 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

1211 ]) 

1212 

1213 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

1214 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock): 

1215 rmvflds_mock.side_effect = ValueError("an error") 

1216 

1217 self.assertEqual( 

1218 (False, -8), 

1219 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

1220 ) 

1221 

1222 self.assert_exchange([]) 

1223 

1224 def test_authorize_signer_ok(self): 

1225 self.dongle.exchange.side_effect = [ 

1226 bytes(), # Response to hash, iteration - doesn't matter 

1227 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

1228 bytes.fromhex("aaaaaa02"), # Response to second signature, OK 

1229 ] 

1230 

1231 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

1232 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

1233 signatures=["aa"*20, "bb"*25] 

1234 ))) 

1235 

1236 self.assert_exchange([ 

1237 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

1238 [0x51, 0x02] + [0xaa]*20, # Signature #1 

1239 [0x51, 0x02] + [0xbb]*25, # Signature #2 

1240 ]) 

1241 

1242 def test_authorize_signer_ok_first_sig(self): 

1243 self.dongle.exchange.side_effect = [ 

1244 bytes(), # Response to hash, iteration - doesn't matter 

1245 bytes.fromhex("aaaaaa02"), # Response to first signature, OK 

1246 ] 

1247 

1248 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

1249 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

1250 signatures=["aa"*20, "bb"*25] 

1251 ))) 

1252 

1253 self.assert_exchange([ 

1254 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

1255 [0x51, 0x02] + [0xaa]*20, # Signature #1 

1256 ]) 

1257 

1258 def test_authorize_signer_sigver_error(self): 

1259 self.dongle.exchange.side_effect = [ 

1260 CommException("an-error"), # Response to hash, iteration - error 

1261 ] 

1262 

1263 with self.assertRaises(HSM2DongleError): 

1264 self.hsm2dongle.authorize_signer(Mock( 

1265 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

1266 signatures=["aa"*20, "bb"*25] 

1267 )) 

1268 

1269 self.assert_exchange([ 

1270 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

1271 ]) 

1272 

1273 def test_authorize_signer_signature_error(self): 

1274 self.dongle.exchange.side_effect = [ 

1275 bytes(), # Response to hash, iteration - doesn't matter 

1276 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

1277 CommException("an-error"), # Response to second signature, ERROR 

1278 ] 

1279 

1280 with self.assertRaises(HSM2DongleError): 

1281 self.hsm2dongle.authorize_signer(Mock( 

1282 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

1283 signatures=["aa"*20, "bb"*25] 

1284 )) 

1285 

1286 self.assert_exchange([ 

1287 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

1288 [0x51, 0x02] + [0xaa]*20, # Signature #1 

1289 [0x51, 0x02] + [0xbb]*25, # Signature #2 

1290 ]) 

1291 

1292 def test_authorize_not_enough_signatures(self): 

1293 self.dongle.exchange.side_effect = [ 

1294 bytes(), # Response to hash, iteration - doesn't matter 

1295 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

1296 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE 

1297 ] 

1298 

1299 with self.assertRaises(HSM2DongleError): 

1300 self.hsm2dongle.authorize_signer(Mock( 

1301 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

1302 signatures=["aa"*20, "bb"*25] 

1303 )) 

1304 

1305 self.assert_exchange([ 

1306 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

1307 [0x51, 0x02] + [0xaa]*20, # Signature #1 

1308 [0x51, 0x02] + [0xbb]*25, # Signature #2 

1309 ])