Coverage for admin/migrate_db.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from .misc import info, head, get_hsm, get_sgx_hsm, dispose_hsm, AdminError 

24from .unlock import do_unlock 

25from .sgx_migration_authorization import SGXMigrationAuthorization 

26from sgx.hsm2dongle import SgxUpgradeRoles 

27 

28 

29def do_migrate_db(options): 

30 head("### -> Migrate DB", fill="#") 

31 hsm_src = None 

32 hsm_dst = None 

33 

34 if options.destination_sgx_port is None or \ 

35 options.destination_sgx_host is None: 

36 raise AdminError("Destination SGX powHSM host and port must be provided") 

37 

38 # Require a migration authorization file 

39 if options.migration_authorization_file_path is None: 

40 raise AdminError("No migration authorization file path given") 

41 

42 # Load the given migration authorization 

43 try: 

44 migration_authorization = SGXMigrationAuthorization.from_jsonfile( 

45 options.migration_authorization_file_path) 

46 # Require at least one signature 

47 if len(migration_authorization.signatures) == 0: 

48 raise RuntimeError("At least one signature is needed to " 

49 "perform a DB migration") 

50 # Perform conversions 

51 source_mre = bytes.fromhex(migration_authorization.migration_spec.exporter) 

52 destination_mre = bytes.fromhex(migration_authorization.migration_spec.importer) 

53 signatures = list(map( 

54 lambda s: bytes.fromhex(s), 

55 migration_authorization.signatures)) 

56 except Exception as e: 

57 raise AdminError(f"While loading the migration authorization file: {str(e)}") 

58 

59 # Attempt to unlock the source device 

60 try: 

61 do_unlock(options, label=False) 

62 except Exception as e: 

63 raise AdminError(f"Failed to unlock device: {str(e)}") 

64 

65 # DB migration 

66 info("Migrating DB... ", options.verbose) 

67 try: 

68 hsm_src = get_hsm(options.verbose) 

69 hsm_dst = get_sgx_hsm( 

70 options.destination_sgx_host, 

71 options.destination_sgx_port, 

72 options.verbose) 

73 

74 info("Sending source spec...", nl=False) 

75 hsm_src.migrate_db_spec( 

76 SgxUpgradeRoles.EXPORTER, source_mre, destination_mre, signatures) 

77 info("OK") 

78 info("Sending destination spec...", nl=False) 

79 hsm_dst.migrate_db_spec( 

80 SgxUpgradeRoles.IMPORTER, source_mre, destination_mre, signatures) 

81 info("OK") 

82 

83 info("Getting source evidence...", nl=False) 

84 src_evidence = hsm_src.migrate_db_get_evidence() 

85 info(f"OK. Got {len(src_evidence)} bytes") 

86 info("Getting destination evidence...", nl=False) 

87 dst_evidence = hsm_dst.migrate_db_get_evidence() 

88 info(f"OK. Got {len(dst_evidence)} bytes") 

89 

90 info("Sending destination evidence to source...", nl=False) 

91 hsm_src.migrate_db_send_evidence(dst_evidence) 

92 info("OK") 

93 info("Sending source evidence to destination...", nl=False) 

94 hsm_dst.migrate_db_send_evidence(src_evidence) 

95 info("OK") 

96 

97 info("Getting data from source...", nl=False) 

98 migration_data = hsm_src.migrate_db_get_data() 

99 info("OK") 

100 info("Sending data to destination...", nl=False) 

101 hsm_dst.migrate_db_send_data(migration_data) 

102 info("OK") 

103 except Exception as e: 

104 raise AdminError(f"Failed to migrate DB: {str(e)}") 

105 finally: 

106 dispose_hsm(hsm_src) 

107 dispose_hsm(hsm_dst) 

108 

109 info("DB migrated successfully")