Coverage for admin/sgx_migration_authorization.py: 96%
80 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import secp256k1 as ec
25from .utils import is_hex_string_of_length, normalize_hex_string, keccak_256
26from .ledger_utils import encode_eth_message
29class SGXMigrationAuthorization:
30 VERSION = 1 # Only supported version
32 @staticmethod
33 def from_jsonfile(path):
34 try:
35 with open(path, "r") as file:
36 spec_auth_map = json.loads(file.read())
38 if type(spec_auth_map) != dict:
39 raise ValueError(
40 "JSON file must contain an object as a top level element")
41 if spec_auth_map["version"] != SGXMigrationAuthorization.VERSION:
42 raise ValueError("Unsupported file format version "
43 f"{spec_auth_map['version']}")
45 migration_spec = SGXMigrationSpec(spec_auth_map["hashes"])
46 signatures = spec_auth_map["signatures"]
47 return SGXMigrationAuthorization(migration_spec, signatures)
48 except (ValueError, json.JSONDecodeError) as e:
49 raise ValueError("Unable to read SGX Migration Authorization from "
50 f"{path}: {str(e)}")
52 @staticmethod
53 def for_spec(migration_spec):
54 return SGXMigrationAuthorization(migration_spec, [])
56 def __init__(self, migration_spec, signatures):
57 self._migration_spec = migration_spec
58 self._signatures = signatures[:]
60 if type(self._migration_spec) != SGXMigrationSpec:
61 raise ValueError(f"Invalid migration spec given: {migration_spec}")
63 if type(signatures) != list:
64 raise ValueError("Signatures must be a list")
66 for signature in signatures:
67 self._assert_signature_valid(signature)
69 @property
70 def migration_spec(self):
71 return self._migration_spec
73 @property
74 def signatures(self):
75 return self._signatures[:]
77 def add_signature(self, signature):
78 self._assert_signature_valid(signature)
79 if self._contains_signature(signature):
80 raise ValueError("Signature already exists")
81 self._signatures.append(signature)
83 def to_dict(self):
84 return {
85 "version": self.VERSION,
86 "hashes": self._migration_spec.to_dict(),
87 "signatures": self._signatures[:],
88 }
90 def save_to_jsonfile(self, path):
91 with open(path, "w") as file:
92 file.write(json.dumps(self.to_dict(), indent=2))
94 def _assert_signature_valid(self, signature):
95 try:
96 ec.PrivateKey().ecdsa_deserialize(bytes.fromhex(signature))
97 except Exception as e:
98 raise ValueError(f"Invalid DER signature: {signature}: {e}")
100 def _contains_signature(self, signature):
101 return signature in self._signatures
104class SGXMigrationSpec:
105 def __init__(self, hashes):
106 if type(hashes) != dict:
107 raise ValueError("Hashes must be a dict")
108 if not is_hex_string_of_length(hashes["exporter"], 32, allow_prefix=True):
109 raise ValueError("Exporter hash must be a 32-byte hex string")
110 if not is_hex_string_of_length(hashes["importer"], 32, allow_prefix=True):
111 raise ValueError("Importer hash must be a 32-byte hex string")
112 self._exporter_hash = normalize_hex_string(hashes["exporter"])
113 self._importer_hash = normalize_hex_string(hashes["importer"])
115 @property
116 def exporter(self):
117 return self._exporter_hash
119 @property
120 def importer(self):
121 return self._importer_hash
123 @property
124 def msg(self):
125 return f"RSK_powHSM_SGX_upgrade_from_{self.exporter}_to_{self.importer}"
127 def get_authorization_msg(self):
128 return encode_eth_message(self.msg)
130 def get_authorization_digest(self):
131 return keccak_256(self.get_authorization_msg())
133 def to_dict(self):
134 return {
135 "exporter": self.exporter,
136 "importer": self.importer,
137 }