Coverage for tests/admin/test_migrate_db.py: 100%

230 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from types import SimpleNamespace 

24from unittest import TestCase 

25from unittest.mock import Mock, patch, call 

26from admin.migrate_db import do_migrate_db 

27from admin.misc import AdminError 

28 

29 

30@patch("sys.stdout") 

31@patch("admin.migrate_db.do_unlock") 

32@patch("admin.migrate_db.get_hsm") 

33@patch("admin.migrate_db.get_sgx_hsm") 

34@patch("admin.migrate_db.dispose_hsm") 

35@patch("admin.migrate_db.SGXMigrationAuthorization") 

36class TestMigrateDb(TestCase): 

37 def setUp(self): 

38 options = SimpleNamespace() 

39 options.no_unlock = False 

40 options.verbose = "is-verbose" 

41 options.destination_sgx_host = "sgx-host" 

42 options.destination_sgx_port = 2345 

43 options.migration_authorization_file_path = "a-migauth-path" 

44 self.options = options 

45 

46 def setupMocks(self, sgx_migration_authorization, dispose_hsm, get_sgx_hsm, 

47 get_hsm, do_unlock): 

48 self.dispose_hsm = dispose_hsm 

49 self.get_sgx_hsm = get_sgx_hsm 

50 self.get_hsm = get_hsm 

51 self.do_unlock = do_unlock 

52 self.sgx_migration_authorization = sgx_migration_authorization 

53 

54 self.src_hsm = Mock() 

55 self.dst_hsm = Mock() 

56 

57 mig_spec = Mock() 

58 mig_spec.exporter = "aa"*32 

59 mig_spec.importer = "bb"*32 

60 self.sgx_migration_authorization_inst = Mock() 

61 self.sgx_migration_authorization_inst.migration_spec = mig_spec 

62 self.sgx_migration_authorization_inst.signatures = \ 

63 ["7369672d6f6e65", "7369672d74776f", "7369672d7468726565"] 

64 self.sgx_migration_authorization.from_jsonfile.return_value = \ 

65 self.sgx_migration_authorization_inst 

66 

67 self.src_hsm.migrate_db_get_evidence.return_value = b"the source evidence" 

68 self.dst_hsm.migrate_db_get_evidence.return_value = b"the destination evidence" 

69 self.src_hsm.migrate_db_get_data.return_value = b"the source data" 

70 

71 get_hsm.return_value = self.src_hsm 

72 get_sgx_hsm.return_value = self.dst_hsm 

73 

74 def assert_disposed_hsms(self): 

75 self.assertEqual(2, self.dispose_hsm.call_count) 

76 self.assertEqual(call(self.src_hsm), self.dispose_hsm.call_args_list[0]) 

77 self.assertEqual(call(self.dst_hsm), self.dispose_hsm.call_args_list[1]) 

78 

79 def test_ok(self, *args): 

80 self.setupMocks(*args[:-1]) 

81 

82 do_migrate_db(self.options) 

83 

84 self.do_unlock.assert_called_with(self.options, label=False) 

85 self.get_hsm.assert_called_with("is-verbose") 

86 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

87 

88 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

89 "a-migauth-path") 

90 

91 self.src_hsm.migrate_db_spec.assert_called_with( 

92 0x01, 

93 bytes.fromhex("aa"*32), 

94 bytes.fromhex("bb"*32), 

95 [b"sig-one", b"sig-two", b"sig-three"]) 

96 self.dst_hsm.migrate_db_spec.assert_called_with( 

97 0x02, 

98 bytes.fromhex("aa"*32), 

99 bytes.fromhex("bb"*32), 

100 [b"sig-one", b"sig-two", b"sig-three"]) 

101 

102 self.src_hsm.migrate_db_get_evidence.assert_called() 

103 self.dst_hsm.migrate_db_get_evidence.assert_called() 

104 

105 self.src_hsm.migrate_db_send_evidence.assert_called_with( 

106 b"the destination evidence") 

107 self.dst_hsm.migrate_db_send_evidence.assert_called_with( 

108 b"the source evidence") 

109 

110 self.src_hsm.migrate_db_get_data.assert_called() 

111 self.dst_hsm.migrate_db_send_data.assert_called_with(b"the source data") 

112 

113 self.assert_disposed_hsms() 

114 

115 def test_no_migauth(self, *args): 

116 self.setupMocks(*args[:-1]) 

117 

118 self.options.migration_authorization_file_path = None 

119 

120 with self.assertRaises(AdminError) as e: 

121 do_migrate_db(self.options) 

122 

123 self.assertIn("file path given", e.exception.args[0]) 

124 

125 self.sgx_migration_authorization.from_jsonfile.assert_not_called() 

126 self.do_unlock.assert_not_called() 

127 self.get_hsm.assert_not_called() 

128 self.get_sgx_hsm.assert_not_called() 

129 

130 def test_migauth_load_fails(self, *args): 

131 self.setupMocks(*args[:-1]) 

132 

133 self.sgx_migration_authorization.from_jsonfile.side_effect = \ 

134 Exception("json made a boo boo") 

135 

136 with self.assertRaises(AdminError) as e: 

137 do_migrate_db(self.options) 

138 

139 self.assertIn("json made a boo boo", e.exception.args[0]) 

140 

141 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

142 "a-migauth-path") 

143 self.do_unlock.assert_not_called() 

144 self.get_hsm.assert_not_called() 

145 self.get_sgx_hsm.assert_not_called() 

146 

147 def test_migauth_no_sigs(self, *args): 

148 self.setupMocks(*args[:-1]) 

149 

150 self.sgx_migration_authorization_inst.signatures = [] 

151 

152 with self.assertRaises(AdminError) as e: 

153 do_migrate_db(self.options) 

154 

155 self.assertIn("At least one signature", e.exception.args[0]) 

156 

157 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

158 "a-migauth-path") 

159 self.do_unlock.assert_not_called() 

160 self.get_hsm.assert_not_called() 

161 self.get_sgx_hsm.assert_not_called() 

162 

163 def test_migauth_invalid_exporter(self, *args): 

164 self.setupMocks(*args[:-1]) 

165 

166 self.sgx_migration_authorization_inst.migration_spec.exporter = "not-hex" 

167 

168 with self.assertRaises(AdminError) as e: 

169 do_migrate_db(self.options) 

170 

171 self.assertIn("non-hexadecimal", e.exception.args[0]) 

172 

173 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

174 "a-migauth-path") 

175 self.do_unlock.assert_not_called() 

176 self.get_hsm.assert_not_called() 

177 self.get_sgx_hsm.assert_not_called() 

178 

179 def test_migauth_invalid_importer(self, *args): 

180 self.setupMocks(*args[:-1]) 

181 

182 self.sgx_migration_authorization_inst.migration_spec.importer = "not-hex" 

183 

184 with self.assertRaises(AdminError) as e: 

185 do_migrate_db(self.options) 

186 

187 self.assertIn("non-hexadecimal", e.exception.args[0]) 

188 

189 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

190 "a-migauth-path") 

191 self.do_unlock.assert_not_called() 

192 self.get_hsm.assert_not_called() 

193 self.get_sgx_hsm.assert_not_called() 

194 

195 def test_migauth_invalid_signature(self, *args): 

196 self.setupMocks(*args[:-1]) 

197 

198 self.sgx_migration_authorization_inst.signatures.append("not-hex") 

199 

200 with self.assertRaises(AdminError) as e: 

201 do_migrate_db(self.options) 

202 

203 self.assertIn("non-hexadecimal", e.exception.args[0]) 

204 

205 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

206 "a-migauth-path") 

207 self.do_unlock.assert_not_called() 

208 self.get_hsm.assert_not_called() 

209 self.get_sgx_hsm.assert_not_called() 

210 

211 def test_unlock_fails(self, *args): 

212 self.setupMocks(*args[:-1]) 

213 

214 self.do_unlock.side_effect = RuntimeError("unlock boo boo") 

215 

216 with self.assertRaises(AdminError) as e: 

217 do_migrate_db(self.options) 

218 

219 self.assertIn("unlock device", e.exception.args[0]) 

220 self.assertIn("boo boo", e.exception.args[0]) 

221 

222 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

223 "a-migauth-path") 

224 self.get_hsm.assert_not_called() 

225 self.get_sgx_hsm.assert_not_called() 

226 

227 def test_spec_fails(self, *args): 

228 self.setupMocks(*args[:-1]) 

229 

230 self.dst_hsm.migrate_db_spec.side_effect = RuntimeError("wrong spec") 

231 

232 with self.assertRaises(AdminError) as e: 

233 do_migrate_db(self.options) 

234 

235 self.assertIn("Failed to migrate DB", e.exception.args[0]) 

236 self.assertIn("wrong spec", e.exception.args[0]) 

237 

238 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

239 "a-migauth-path") 

240 self.do_unlock.assert_called_with(self.options, label=False) 

241 self.get_hsm.assert_called_with("is-verbose") 

242 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

243 

244 self.src_hsm.migrate_db_spec.assert_called_with( 

245 0x01, 

246 bytes.fromhex("aa"*32), 

247 bytes.fromhex("bb"*32), 

248 [b"sig-one", b"sig-two", b"sig-three"]) 

249 self.dst_hsm.migrate_db_spec.assert_called_with( 

250 0x02, 

251 bytes.fromhex("aa"*32), 

252 bytes.fromhex("bb"*32), 

253 [b"sig-one", b"sig-two", b"sig-three"]) 

254 

255 self.src_hsm.migrate_db_get_evidence.assert_not_called() 

256 self.dst_hsm.migrate_db_get_evidence.assert_not_called() 

257 self.src_hsm.migrate_db_send_evidence.assert_not_called() 

258 self.dst_hsm.migrate_db_send_evidence.assert_not_called() 

259 self.src_hsm.migrate_db_get_data.assert_not_called() 

260 self.dst_hsm.migrate_db_send_data.assert_not_called() 

261 

262 self.assert_disposed_hsms() 

263 

264 def test_get_evidence_fails(self, *args): 

265 self.setupMocks(*args[:-1]) 

266 

267 self.src_hsm.migrate_db_get_evidence.side_effect = \ 

268 RuntimeError("evidence no no") 

269 

270 with self.assertRaises(AdminError) as e: 

271 do_migrate_db(self.options) 

272 

273 self.assertIn("Failed to migrate DB", e.exception.args[0]) 

274 self.assertIn("evidence no no", e.exception.args[0]) 

275 

276 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

277 "a-migauth-path") 

278 self.do_unlock.assert_called_with(self.options, label=False) 

279 self.get_hsm.assert_called_with("is-verbose") 

280 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

281 

282 self.src_hsm.migrate_db_spec.assert_called_with( 

283 0x01, 

284 bytes.fromhex("aa"*32), 

285 bytes.fromhex("bb"*32), 

286 [b"sig-one", b"sig-two", b"sig-three"]) 

287 self.dst_hsm.migrate_db_spec.assert_called_with( 

288 0x02, 

289 bytes.fromhex("aa"*32), 

290 bytes.fromhex("bb"*32), 

291 [b"sig-one", b"sig-two", b"sig-three"]) 

292 

293 self.src_hsm.migrate_db_get_evidence.assert_called() 

294 self.dst_hsm.migrate_db_get_evidence.assert_not_called() 

295 self.src_hsm.migrate_db_send_evidence.assert_not_called() 

296 self.dst_hsm.migrate_db_send_evidence.assert_not_called() 

297 self.src_hsm.migrate_db_get_data.assert_not_called() 

298 self.dst_hsm.migrate_db_send_data.assert_not_called() 

299 

300 self.assert_disposed_hsms() 

301 

302 def test_send_evidence_fails(self, *args): 

303 self.setupMocks(*args[:-1]) 

304 

305 self.dst_hsm.migrate_db_send_evidence.side_effect = \ 

306 RuntimeError("sending bad bad") 

307 

308 with self.assertRaises(AdminError) as e: 

309 do_migrate_db(self.options) 

310 

311 self.assertIn("Failed to migrate DB", e.exception.args[0]) 

312 self.assertIn("sending bad bad", e.exception.args[0]) 

313 

314 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

315 "a-migauth-path") 

316 self.do_unlock.assert_called_with(self.options, label=False) 

317 self.get_hsm.assert_called_with("is-verbose") 

318 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

319 

320 self.src_hsm.migrate_db_spec.assert_called_with( 

321 0x01, 

322 bytes.fromhex("aa"*32), 

323 bytes.fromhex("bb"*32), 

324 [b"sig-one", b"sig-two", b"sig-three"]) 

325 self.dst_hsm.migrate_db_spec.assert_called_with( 

326 0x02, 

327 bytes.fromhex("aa"*32), 

328 bytes.fromhex("bb"*32), 

329 [b"sig-one", b"sig-two", b"sig-three"]) 

330 

331 self.src_hsm.migrate_db_get_evidence.assert_called() 

332 self.dst_hsm.migrate_db_get_evidence.assert_called() 

333 self.src_hsm.migrate_db_send_evidence.assert_called_with( 

334 b"the destination evidence") 

335 self.dst_hsm.migrate_db_send_evidence.assert_called_with( 

336 b"the source evidence") 

337 self.src_hsm.migrate_db_get_data.assert_not_called() 

338 self.dst_hsm.migrate_db_send_data.assert_not_called() 

339 

340 self.assert_disposed_hsms() 

341 

342 def test_get_data_fails(self, *args): 

343 self.setupMocks(*args[:-1]) 

344 

345 self.src_hsm.migrate_db_get_data.side_effect = RuntimeError("data nana") 

346 

347 with self.assertRaises(AdminError) as e: 

348 do_migrate_db(self.options) 

349 

350 self.assertIn("Failed to migrate DB", e.exception.args[0]) 

351 self.assertIn("data nana", e.exception.args[0]) 

352 

353 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

354 "a-migauth-path") 

355 self.do_unlock.assert_called_with(self.options, label=False) 

356 self.get_hsm.assert_called_with("is-verbose") 

357 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

358 

359 self.src_hsm.migrate_db_spec.assert_called_with( 

360 0x01, 

361 bytes.fromhex("aa"*32), 

362 bytes.fromhex("bb"*32), 

363 [b"sig-one", b"sig-two", b"sig-three"]) 

364 self.dst_hsm.migrate_db_spec.assert_called_with( 

365 0x02, 

366 bytes.fromhex("aa"*32), 

367 bytes.fromhex("bb"*32), 

368 [b"sig-one", b"sig-two", b"sig-three"]) 

369 

370 self.src_hsm.migrate_db_get_evidence.assert_called() 

371 self.dst_hsm.migrate_db_get_evidence.assert_called() 

372 self.src_hsm.migrate_db_send_evidence.assert_called_with( 

373 b"the destination evidence") 

374 self.dst_hsm.migrate_db_send_evidence.assert_called_with( 

375 b"the source evidence") 

376 self.src_hsm.migrate_db_get_data.assert_called() 

377 self.dst_hsm.migrate_db_send_data.assert_not_called() 

378 

379 self.assert_disposed_hsms() 

380 

381 def test_send_data_fails(self, *args): 

382 self.setupMocks(*args[:-1]) 

383 

384 self.dst_hsm.migrate_db_send_data.side_effect = RuntimeError("import boo boo") 

385 

386 with self.assertRaises(AdminError) as e: 

387 do_migrate_db(self.options) 

388 

389 self.assertIn("Failed to migrate DB", e.exception.args[0]) 

390 self.assertIn("import boo boo", e.exception.args[0]) 

391 

392 self.sgx_migration_authorization.from_jsonfile.assert_called_with( 

393 "a-migauth-path") 

394 self.do_unlock.assert_called_with(self.options, label=False) 

395 self.get_hsm.assert_called_with("is-verbose") 

396 self.get_sgx_hsm.assert_called_with("sgx-host", 2345, "is-verbose") 

397 

398 self.src_hsm.migrate_db_spec.assert_called_with( 

399 0x01, 

400 bytes.fromhex("aa"*32), 

401 bytes.fromhex("bb"*32), 

402 [b"sig-one", b"sig-two", b"sig-three"]) 

403 self.dst_hsm.migrate_db_spec.assert_called_with( 

404 0x02, 

405 bytes.fromhex("aa"*32), 

406 bytes.fromhex("bb"*32), 

407 [b"sig-one", b"sig-two", b"sig-three"]) 

408 

409 self.src_hsm.migrate_db_get_evidence.assert_called() 

410 self.dst_hsm.migrate_db_get_evidence.assert_called() 

411 self.src_hsm.migrate_db_send_evidence.assert_called_with( 

412 b"the destination evidence") 

413 self.dst_hsm.migrate_db_send_evidence.assert_called_with( 

414 b"the source evidence") 

415 self.src_hsm.migrate_db_get_data.assert_called() 

416 self.dst_hsm.migrate_db_send_data.assert_called_with(b"the source data") 

417 

418 self.assert_disposed_hsms()