Coverage for tests/sgx/hsm2dongle_cmds/test_hsm2dongle_update_ancestor.py: 96%

107 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest.mock import Mock, patch, call 

24from parameterized import parameterized 

25from ..test_hsm2dongle import TestHSM2DongleBase, HSM2DongleTestMode 

26from ledger.hsm2dongle import HSM2DongleError 

27from ledgerblue.commException import CommException 

28 

29import logging 

30 

31logging.disable(logging.CRITICAL) 

32 

33 

34class TestHSM2DongleSGXUpdateAncestor(TestHSM2DongleBase): 

35 def get_test_mode(self): 

36 return HSM2DongleTestMode.SGX 

37 

38 def spec_to_exchange(self, spec, trim=False): 

39 # Chunk size is ignored when requesting block data, full bh is always sent 

40 

41 # Block header 

42 exchanges = [ 

43 bytes([0, 0, 0x03]), 

44 bytes([0, 0, 0x04, 0xAA]), 

45 ] 

46 

47 # Spec has brothers? 

48 if len(spec) == 3: 

49 exchanges += [bytes([0, 0, 0x07])] # Request brother list metadata 

50 if len(spec) == 3 and spec[2] is not None: 

51 brother_count = len(spec[2][0]) 

52 for i in range(brother_count): 

53 exchanges += [ 

54 bytes([0, 0, 0x08]), 

55 bytes([0, 0, 0x09, 0xBB]) 

56 ] 

57 

58 return exchanges 

59 

60 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

61 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

62 def test_update_ancestor_ok(self, mmplsize_mock, rmvflds_mock): 

63 rmvflds_mock.side_effect = lambda h: h[:-bytes.fromhex(h)[-1]*2] 

64 mmplsize_mock.side_effect = lambda h: len(h)//8 

65 blocks_spec = [ 

66 # (block bytes, chunk size) 

67 ( 

68 self.buf(300) + 

69 bytes.fromhex("aabbccddeeff0011220a"), 

70 None, 

71 ), 

72 ( 

73 self.buf(250) + 

74 bytes.fromhex("1122334405"), 

75 None, 

76 ), 

77 ( 

78 self.buf(130) + 

79 bytes.fromhex("334455aabbccdd2211982311aacdfe10"), 

80 None, 

81 ), 

82 ] 

83 

84 self.dongle.exchange.side_effect = [ 

85 bs for excs in map(lambda s: self.spec_to_exchange(s, trim=True), blocks_spec) 

86 for bs in excs 

87 ] + [bytes([0, 0, 0x05])] # Success response 

88 

89 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

90 self.assertEqual((True, 1), 

91 self.hsm2dongle.update_ancestor(blocks_hex)) 

92 

93 self.assert_exchange([ 

94 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

95 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

96 [0x30, 0x04] + list(blocks_spec[0][0][:-10]), # Block #1 

97 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

98 [0x30, 0x04] + list(blocks_spec[1][0][:-5]), # Block #2 

99 [0x30, 0x03, 0x00, 0x20], # Block #3 meta 

100 [0x30, 0x04] + list(blocks_spec[2][0][:-16]), # Block #3 

101 ]) 

102 

103 @parameterized.expand([ 

104 ("prot_invalid", 0x6B87, -4), 

105 ("rlp_invalid", 0x6B88, -5), 

106 ("block_too_old", 0x6B89, -5), 

107 ("block_too_short", 0x6B8A, -5), 

108 ("parent_hash_invalid", 0x6B8B, -5), 

109 ("receipt_root_invalid", 0x6B8C, -5), 

110 ("block_num_invalid", 0x6B8D, -5), 

111 ("btc_header_invalid", 0x6B90, -5), 

112 ("mm_rlp_len_mismatch", 0x6B93, -5), 

113 ("buffer_overflow", 0x6B99, -5), 

114 ("chain_mismatch", 0x6B9A, -6), 

115 ("ancestor_tip_mismatch", 0x6B9C, -7), 

116 ("unexpected", 0x6BFF, -10), 

117 ("error_response", bytes([0, 0, 0xFF]), -10), 

118 ]) 

119 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

120 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

121 def test_update_ancestor_bh_error_result(self, _, error_code, response, 

122 mmplsize_mock, rmvflds_mock): 

123 rmvflds_mock.side_effect = lambda h: h 

124 mmplsize_mock.side_effect = lambda h: len(h)//8 

125 blocks_spec = [ 

126 # (block bytes, chunk size) 

127 (self.buf(300), None), 

128 (self.buf(250), None), 

129 (self.buf(140), None), 

130 ] 

131 

132 side_effect = [ 

133 bs for excs in map(self.spec_to_exchange, blocks_spec) 

134 for bs in excs 

135 ] 

136 # Make the second block fail 

137 # Init + first block meta & data + second block meta & data 

138 exchange_index = 2 + 2 

139 if type(error_code) == bytes: 

140 side_effect[exchange_index] = error_code 

141 else: 

142 side_effect[exchange_index] = CommException("a-message", error_code) 

143 side_effect = side_effect[:exchange_index + 1] 

144 self.dongle.exchange.side_effect = side_effect 

145 

146 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

147 self.assertEqual( 

148 (False, response), 

149 self.hsm2dongle.update_ancestor(blocks_hex), 

150 ) 

151 

152 self.assert_exchange([ 

153 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

154 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

155 [0x30, 0x04] + list(blocks_spec[0][0]), # Block #1 

156 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

157 [0x30, 0x04] + list(blocks_spec[1][0]), # Block #2 

158 ]) 

159 

160 @parameterized.expand([ 

161 ("prot_invalid", 0x6B87, -3), 

162 ("unexpected", 0x6BFF, -10), 

163 ("error_response", bytes([0, 0, 0xFF]), -10), 

164 ]) 

165 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

166 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

167 def test_update_ancestor_metadata_error_result(self, _, error_code, response, 

168 mmplsize_mock, rmvflds_mock): 

169 rmvflds_mock.side_effect = lambda h: h 

170 mmplsize_mock.side_effect = lambda h: len(h)//8 

171 blocks_spec = [ 

172 # (block bytes, chunk size) 

173 (self.buf(300), None), 

174 (self.buf(250), None), 

175 (self.buf(140), None), 

176 ] 

177 

178 side_effect = [ 

179 bs for excs in map(self.spec_to_exchange, blocks_spec) 

180 for bs in excs 

181 ] 

182 # Make the metadata of the third block fail 

183 # Init + first and second block meta & chunks + third block meta 

184 exchange_index = 1 + 2 + 2 

185 if type(error_code) == bytes: 

186 side_effect[exchange_index] = error_code 

187 else: 

188 side_effect[exchange_index] = CommException("a-message", error_code) 

189 side_effect = side_effect[:exchange_index + 1] 

190 self.dongle.exchange.side_effect = side_effect 

191 

192 blocks_hex = list(map(lambda bs: bs[0].hex(), blocks_spec)) 

193 self.assertEqual( 

194 (False, response), 

195 self.hsm2dongle.update_ancestor(blocks_hex), 

196 ) 

197 

198 self.assert_exchange([ 

199 [0x30, 0x02, 0x00, 0x00, 0x00, 0x03], # Init, 3 blocks 

200 [0x30, 0x03, 0x00, 0x4B], # Block #1 meta 

201 [0x30, 0x04] + list(blocks_spec[0][0]), # Block #1 

202 [0x30, 0x03, 0x00, 0x3E], # Block #2 meta 

203 [0x30, 0x04] + list(blocks_spec[1][0]), # Block #2 

204 [0x30, 0x03, 0x00, 0x23], # Block #3 meta 

205 ]) 

206 

207 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

208 @patch("ledger.hsm2dongle.rlp_mm_payload_size") 

209 def test_update_ancestor_metadata_error_generating(self, mmplsize_mock, rmvflds_mock): 

210 rmvflds_mock.side_effect = lambda h: h 

211 mmplsize_mock.side_effect = ValueError() 

212 self.dongle.exchange.side_effect = [bytes([0, 0, 0x03])] 

213 

214 self.assertEqual( 

215 (False, -2), 

216 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

217 ) 

218 

219 self.assert_exchange([ 

220 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

221 ]) 

222 self.assertEqual([call("first-block")], mmplsize_mock.call_args_list) 

223 

224 @parameterized.expand([ 

225 ("prot_invalid", CommException("a-message", 0x6B87), -1), 

226 ("unexpected", CommException("a-message", 0x6BFF), -10), 

227 ("invalid_response", bytes([0, 0, 0xFF]), -10), 

228 ]) 

229 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

230 def test_update_ancestor_init_error(self, _, error, response, rmvflds_mock): 

231 rmvflds_mock.side_effect = lambda h: h 

232 self.dongle.exchange.side_effect = [error] 

233 

234 self.assertEqual( 

235 (False, response), 

236 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

237 ) 

238 

239 self.assert_exchange([ 

240 [0x30, 0x02, 0x00, 0x00, 0x00, 0x02], # Init, 2 blocks 

241 ]) 

242 

243 @patch("ledger.hsm2dongle.remove_mm_fields_if_present") 

244 def test_update_ancestor_remove_mmfields_exception(self, rmvflds_mock): 

245 rmvflds_mock.side_effect = ValueError("an error") 

246 

247 self.assertEqual( 

248 (False, -8), 

249 self.hsm2dongle.update_ancestor(["first-block", "second-block"]), 

250 ) 

251 

252 self.assert_exchange([]) 

253 

254 def test_authorize_signer_ok(self): 

255 self.dongle.exchange.side_effect = [ 

256 bytes(), # Response to hash, iteration - doesn't matter 

257 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

258 bytes.fromhex("aaaaaa02"), # Response to second signature, OK 

259 ] 

260 

261 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

262 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

263 signatures=["aa"*20, "bb"*25] 

264 ))) 

265 

266 self.assert_exchange([ 

267 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

268 [0x51, 0x02] + [0xaa]*20, # Signature #1 

269 [0x51, 0x02] + [0xbb]*25, # Signature #2 

270 ]) 

271 

272 def test_authorize_signer_ok_first_sig(self): 

273 self.dongle.exchange.side_effect = [ 

274 bytes(), # Response to hash, iteration - doesn't matter 

275 bytes.fromhex("aaaaaa02"), # Response to first signature, OK 

276 ] 

277 

278 self.assertTrue(self.hsm2dongle.authorize_signer(Mock( 

279 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

280 signatures=["aa"*20, "bb"*25] 

281 ))) 

282 

283 self.assert_exchange([ 

284 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

285 [0x51, 0x02] + [0xaa]*20, # Signature #1 

286 ]) 

287 

288 def test_authorize_signer_sigver_error(self): 

289 self.dongle.exchange.side_effect = [ 

290 CommException("an-error"), # Response to hash, iteration - error 

291 ] 

292 

293 with self.assertRaises(HSM2DongleError): 

294 self.hsm2dongle.authorize_signer(Mock( 

295 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

296 signatures=["aa"*20, "bb"*25] 

297 )) 

298 

299 self.assert_exchange([ 

300 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

301 ]) 

302 

303 def test_authorize_signer_signature_error(self): 

304 self.dongle.exchange.side_effect = [ 

305 bytes(), # Response to hash, iteration - doesn't matter 

306 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

307 CommException("an-error"), # Response to second signature, ERROR 

308 ] 

309 

310 with self.assertRaises(HSM2DongleError): 

311 self.hsm2dongle.authorize_signer(Mock( 

312 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

313 signatures=["aa"*20, "bb"*25] 

314 )) 

315 

316 self.assert_exchange([ 

317 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

318 [0x51, 0x02] + [0xaa]*20, # Signature #1 

319 [0x51, 0x02] + [0xbb]*25, # Signature #2 

320 ]) 

321 

322 def test_authorize_not_enough_signatures(self): 

323 self.dongle.exchange.side_effect = [ 

324 bytes(), # Response to hash, iteration - doesn't matter 

325 bytes.fromhex("aaaaaa01"), # Response to first signature, MORE 

326 bytes.fromhex("aaaaaa01"), # Response to second signature, MORE 

327 ] 

328 

329 with self.assertRaises(HSM2DongleError): 

330 self.hsm2dongle.authorize_signer(Mock( 

331 signer_version=Mock(hash="ee"*32, iteration=0x4321), 

332 signatures=["aa"*20, "bb"*25] 

333 )) 

334 

335 self.assert_exchange([ 

336 [0x51, 0x01] + [0xee]*32 + [0x43, 0x21], # Sigver, hash plus iteration 

337 [0x51, 0x02] + [0xaa]*20, # Signature #1 

338 [0x51, 0x02] + [0xbb]*25, # Signature #2 

339 ])