Coverage for admin/sgx_migration_authorization.py: 96%

80 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import secp256k1 as ec 

25from .utils import is_hex_string_of_length, normalize_hex_string, keccak_256 

26from .ledger_utils import encode_eth_message 

27 

28 

29class SGXMigrationAuthorization: 

30 VERSION = 1 # Only supported version 

31 

32 @staticmethod 

33 def from_jsonfile(path): 

34 try: 

35 with open(path, "r") as file: 

36 spec_auth_map = json.loads(file.read()) 

37 

38 if type(spec_auth_map) != dict: 

39 raise ValueError( 

40 "JSON file must contain an object as a top level element") 

41 if spec_auth_map["version"] != SGXMigrationAuthorization.VERSION: 

42 raise ValueError("Unsupported file format version " 

43 f"{spec_auth_map['version']}") 

44 

45 migration_spec = SGXMigrationSpec(spec_auth_map["hashes"]) 

46 signatures = spec_auth_map["signatures"] 

47 return SGXMigrationAuthorization(migration_spec, signatures) 

48 except (ValueError, json.JSONDecodeError) as e: 

49 raise ValueError("Unable to read SGX Migration Authorization from " 

50 f"{path}: {str(e)}") 

51 

52 @staticmethod 

53 def for_spec(migration_spec): 

54 return SGXMigrationAuthorization(migration_spec, []) 

55 

56 def __init__(self, migration_spec, signatures): 

57 self._migration_spec = migration_spec 

58 self._signatures = signatures[:] 

59 

60 if type(self._migration_spec) != SGXMigrationSpec: 

61 raise ValueError(f"Invalid migration spec given: {migration_spec}") 

62 

63 if type(signatures) != list: 

64 raise ValueError("Signatures must be a list") 

65 

66 for signature in signatures: 

67 self._assert_signature_valid(signature) 

68 

69 @property 

70 def migration_spec(self): 

71 return self._migration_spec 

72 

73 @property 

74 def signatures(self): 

75 return self._signatures[:] 

76 

77 def add_signature(self, signature): 

78 self._assert_signature_valid(signature) 

79 if self._contains_signature(signature): 

80 raise ValueError("Signature already exists") 

81 self._signatures.append(signature) 

82 

83 def to_dict(self): 

84 return { 

85 "version": self.VERSION, 

86 "hashes": self._migration_spec.to_dict(), 

87 "signatures": self._signatures[:], 

88 } 

89 

90 def save_to_jsonfile(self, path): 

91 with open(path, "w") as file: 

92 file.write(json.dumps(self.to_dict(), indent=2)) 

93 

94 def _assert_signature_valid(self, signature): 

95 try: 

96 ec.PrivateKey().ecdsa_deserialize(bytes.fromhex(signature)) 

97 except Exception as e: 

98 raise ValueError(f"Invalid DER signature: {signature}: {e}") 

99 

100 def _contains_signature(self, signature): 

101 return signature in self._signatures 

102 

103 

104class SGXMigrationSpec: 

105 def __init__(self, hashes): 

106 if type(hashes) != dict: 

107 raise ValueError("Hashes must be a dict") 

108 if not is_hex_string_of_length(hashes["exporter"], 32, allow_prefix=True): 

109 raise ValueError("Exporter hash must be a 32-byte hex string") 

110 if not is_hex_string_of_length(hashes["importer"], 32, allow_prefix=True): 

111 raise ValueError("Importer hash must be a 32-byte hex string") 

112 self._exporter_hash = normalize_hex_string(hashes["exporter"]) 

113 self._importer_hash = normalize_hex_string(hashes["importer"]) 

114 

115 @property 

116 def exporter(self): 

117 return self._exporter_hash 

118 

119 @property 

120 def importer(self): 

121 return self._importer_hash 

122 

123 @property 

124 def msg(self): 

125 return f"RSK_powHSM_SGX_upgrade_from_{self.exporter}_to_{self.importer}" 

126 

127 def get_authorization_msg(self): 

128 return encode_eth_message(self.msg) 

129 

130 def get_authorization_digest(self): 

131 return keccak_256(self.get_authorization_msg()) 

132 

133 def to_dict(self): 

134 return { 

135 "exporter": self.exporter, 

136 "importer": self.importer, 

137 }