Coverage for tests/ledger/hsm2dongle_cmds/test_hsm2dongle_update_ancestor.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest.mock import Mock, patch, call 

24from parameterized import parameterized 

25from ..test_hsm2dongle import TestHSM2DongleBase 

26from ledger.hsm2dongle import HSM2DongleError 

27from ledgerblue.commException import CommException 

28 

29import logging 

30 

31logging.disable(logging.CRITICAL) 

32 

33 

34class TestHSM2DongleUpdateAncestor(TestHSM2DongleBase): 

35 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

36 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

37 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock): 

38 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2] 

39 mmplsize_mock.side_effect = lambda h: len(h)//8 

40 blocks_spec = [ 

41 # (block bytes, chunk size) 

42 ( 

43 self.buf(300) + 

44 bytes.fromhex("aabbccddeeff0011220a"), 

45 80, 

46 ), 

47 ( 

48 self.buf(250) + 

49 bytes.fromhex("1122334405"), 

50 100, 

51 ), 

52 ( 

53 self.buf(130) + 

54 bytes.fromhex("334455aabbccdd2211982311aacdfe10"), 

55 50, 

56 ), 

57 ] 

58 

59 self.dongle.exchange.side_effect = [ 

60 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec) 

61 for bs in excs 

62 ] + [bytes([0, 0, 0x05])] # Success response 

63 

64 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

65 self.assertEqual((True, 1), 

66 self.hsm2dongle.update_ancestor(blocks_hex)) 

67 

68 self.assert_exchange([ 

69 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

70 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

71 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

72 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

73 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

74 [0x30, 0x04] + 

75 list(blocks_spec[0][0][80*3:80*4][:-blocks_spec[0][0][-1]]), # Block #1 chunk 

76 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

77 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

78 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

79 [0x30, 0x04] + 

80 list(blocks_spec[1][0][100*2:100 * 

81 3][:-blocks_spec[1][0][-1]]), # Block #2 chunk 

82 [0x30, 0x03, 0x00, 0x20], # Block #3 meta 

83 [0x30, 0x04] + list(blocks_spec[2][0][50*0:50*1]), # Block #2 chunk 

84 [0x30, 0x04] + list(blocks_spec[2][0][50*1:50*2]), # Block #3 chunk 

85 [0x30, 0x04] + 

86 list(blocks_spec[2][0][50*2:50*3][:-blocks_spec[2][0][-1]]), # Block #3 chunk 

87 ]) 

88 

89 @parameterized.expand([ 

90 ("prot_invalid", 0x6B87, -4), 

91 ("rlp_invalid", 0x6B88, -5), 

92 ("block_too_old", 0x6B89, -5), 

93 ("block_too_short", 0x6B8A, -5), 

94 ("parent_hash_invalid", 0x6B8B, -5), 

95 ("receipt_root_invalid", 0x6B8C, -5), 

96 ("block_num_invalid", 0x6B8D, -5), 

97 ("btc_header_invalid", 0x6B90, -5), 

98 ("mm_rlp_len_mismatch", 0x6B93, -5), 

99 ("buffer_overflow", 0x6B99, -5), 

100 ("chain_mismatch", 0x6B9A, -6), 

101 ("ancestor_tip_mismatch", 0x6B9C, -7), 

102 ("unexpected", 0x6BFF, -10), 

103 ("error_response", bytes([0, 0, 0xFF]), -10), 

104 ]) 

105 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

106 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

107 def test_update_ancestor_chunk_error_result(self, _, error_code, response, 

108 mmplsize_mock, rmvflds_mock): 

109 rmvflds_mock.side_effect = lambda h: h 

110 mmplsize_mock.side_effect = lambda h: len(h)//8 

111 blocks_spec = [ 

112 # (block bytes, chunk size) 

113 (self.buf(300), 80), 

114 (self.buf(250), 100), 

115 (self.buf(140), 50), 

116 ] 

117 

118 side_effect = [ 

119 bs for excs in map(self.spec_to_exchange, blocks_spec) 

120 for bs in excs 

121 ] 

122 # Make the second chunk of the second block fail 

123 exchange_index = ( 

124 1 + (300//80 + 2) + 2 

125 ) # Init + first block meta & chunks + second block meta & first chunk 

126 if type(error_code) == bytes: 

127 side_effect[exchange_index] = error_code 

128 else: 

129 side_effect[exchange_index] = CommException("a-message", error_code) 

130 side_effect = side_effect[:exchange_index + 1] 

131 self.dongle.exchange.side_effect = side_effect 

132 

133 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

134 self.assertEqual( 

135 (False, response), 

136 self.hsm2dongle.update_ancestor(blocks_hex), 

137 ) 

138 

139 self.assert_exchange([ 

140 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

141 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

142 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

143 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

144 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

145 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk 

146 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

147 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

148 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

149 ]) 

150 

151 @parameterized.expand([ 

152 ("prot_invalid", 0x6B87, -3), 

153 ("unexpected", 0x6BFF, -10), 

154 ("error_response", bytes([0, 0, 0xFF]), -10), 

155 ]) 

156 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

157 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

158 def test_update_ancestor_metadata_error_result(self, _, error_code, response, 

159 mmplsize_mock, rmvflds_mock): 

160 rmvflds_mock.side_effect = lambda h: h 

161 mmplsize_mock.side_effect = lambda h: len(h)//8 

162 blocks_spec = [ 

163 # (block bytes, chunk size) 

164 (self.buf(300), 80), 

165 (self.buf(250), 100), 

166 (self.buf(140), 50), 

167 ] 

168 

169 side_effect = [ 

170 bs for excs in map(self.spec_to_exchange, blocks_spec) 

171 for bs in excs 

172 ] 

173 # Make the metadata of the third block fail 

174 exchange_index = ( 

175 1 + (300//80 + 2) + (250//100 + 2) 

176 ) # Init + first and second block meta & chunks + third block meta 

177 if type(error_code) == bytes: 

178 side_effect[exchange_index] = error_code 

179 else: 

180 side_effect[exchange_index] = CommException("a-message", error_code) 

181 side_effect = side_effect[:exchange_index + 1] 

182 self.dongle.exchange.side_effect = side_effect 

183 

184 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

185 self.assertEqual( 

186 (False, response), 

187 self.hsm2dongle.update_ancestor(blocks_hex), 

188 ) 

189 

190 self.assert_exchange([ 

191 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

192 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

193 [0x30, 0x04] + list(blocks_spec[0][0][80*0:80*1]), # Block #1 chunk 

194 [0x30, 0x04] + list(blocks_spec[0][0][80*1:80*2]), # Block #1 chunk 

195 [0x30, 0x04] + list(blocks_spec[0][0][80*2:80*3]), # Block #1 chunk 

196 [0x30, 0x04] + list(blocks_spec[0][0][80*3:80*4]), # Block #1 chunk 

197 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

198 [0x30, 0x04] + list(blocks_spec[1][0][100*0:100*1]), # Block #2 chunk 

199 [0x30, 0x04] + list(blocks_spec[1][0][100*1:100*2]), # Block #2 chunk 

200 [0x30, 0x04] + list(blocks_spec[1][0][100*2:100*3]), # Block #2 chunk 

201 [0x30, 0x03, 0x00, 0x23], # Block #3 meta 

202 ]) 

203 

204 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

205 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

206 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock): 

207 rmvflds_mock.side_effect = lambda h: h 

208 mmplsize_mock.side_effect = ValueError() 

209 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])] 

210 

211 self.assertEqual( 

212 (False, -2), 

213 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

214 ) 

215 

216 self.assert_exchange([ 

217 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

218 ]) 

219 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list) 

220 

221 @parameterized.expand([ 

222 ("prot_invalid", CommException("a-message", 0x6B87), -1), 

223 ("unexpected", CommException("a-message", 0x6BFF), -10), 

224 ("invalid_response", bytes([0, 0, 0xFF]), -10), 

225 ]) 

226 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

227 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock): 

228 rmvflds_mock.side_effect = lambda h: h 

229 self.dongle.exchange.side_effect = [error] 

230 

231 self.assertEqual( 

232 (False, response), 

233 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

234 ) 

235 

236 self.assert_exchange([ 

237 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

238 ]) 

239 

240 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

241 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock): 

242 rmvflds_mock.side_effect = ValueError("an error") 

243 

244 self.assertEqual( 

245 (False, -8), 

246 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

247 ) 

248 

249 self.assert_exchange([]) 

250 

251 def test_authorize_signer_ok(self): 

252 self.dongle.exchange.side_effect = [ 

253 bytes(), # Response to hash, iteration - doesn't matter 

254 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

255 bytes.fromhex("aaaaaa02"), # Response to second signature, OK 

256 ] 

257 

258 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

259 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

260 signatures=["aa"*20, "bb"*25] 

261 ))) 

262 

263 self.assert_exchange([ 

264 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

265 [0x51, 0x02] + [0xaa]*20, # Signature #1 

266 [0x51, 0x02] + [0xbb]*25, # Signature #2 

267 ]) 

268 

269 def test_authorize_signer_ok_first_sig(self): 

270 self.dongle.exchange.side_effect = [ 

271 bytes(), # Response to hash, iteration - doesn't matter 

272 bytes.fromhex("aaaaaa02"), # Response to first signature, OK 

273 ] 

274 

275 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

276 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

277 signatures=["aa"*20, "bb"*25] 

278 ))) 

279 

280 self.assert_exchange([ 

281 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

282 [0x51, 0x02] + [0xaa]*20, # Signature #1 

283 ]) 

284 

285 def test_authorize_signer_sigver_error(self): 

286 self.dongle.exchange.side_effect = [ 

287 CommException("an-error"), # Response to hash, iteration - error 

288 ] 

289 

290 with self.assertRaises(HSM2DongleError): 

291 self.hsm2dongle.authorize_signer(Mock( 

292 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

293 signatures=["aa"*20, "bb"*25] 

294 )) 

295 

296 self.assert_exchange([ 

297 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

298 ]) 

299 

300 def test_authorize_signer_signature_error(self): 

301 self.dongle.exchange.side_effect = [ 

302 bytes(), # Response to hash, iteration - doesn't matter 

303 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

304 CommException("an-error"), # Response to second signature, ERROR 

305 ] 

306 

307 with self.assertRaises(HSM2DongleError): 

308 self.hsm2dongle.authorize_signer(Mock( 

309 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

310 signatures=["aa"*20, "bb"*25] 

311 )) 

312 

313 self.assert_exchange([ 

314 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

315 [0x51, 0x02] + [0xaa]*20, # Signature #1 

316 [0x51, 0x02] + [0xbb]*25, # Signature #2 

317 ]) 

318 

319 def test_authorize_not_enough_signatures(self): 

320 self.dongle.exchange.side_effect = [ 

321 bytes(), # Response to hash, iteration - doesn't matter 

322 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

323 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE 

324 ] 

325 

326 with self.assertRaises(HSM2DongleError): 

327 self.hsm2dongle.authorize_signer(Mock( 

328 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

329 signatures=["aa"*20, "bb"*25] 

330 )) 

331 

332 self.assert_exchange([ 

333 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

334 [0x51, 0x02] + [0xaa]*20, # Signature #1 

335 [0x51, 0x02] + [0xbb]*25, # Signature #2 

336 ])