Coverage for admin/signer_authorization.py: 94%
70 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-05 20:41 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import json
24import secp256k1 as ec
25import sha3
26from .utils import is_hex_string_of_length, hex_or_decimal_string_to_int
27from .ledger_utils import encode_eth_message
30class SignerAuthorization:
31 VERSION = 1 # Only supported version
33 @staticmethod
34 def from_jsonfile(path):
35 try:
36 with open(path, "r") as file:
37 signer_auth_map = json.loads(file.read())
39 if type(signer_auth_map) != dict:
40 raise ValueError(
41 "JSON file must contain an object as a top level element")
43 if signer_auth_map["version"] != SignerAuthorization.VERSION:
44 raise ValueError("Unsupported file format version "
45 f"{signer_auth_map['version']}")
47 return SignerAuthorization(
48 SignerVersion(signer_auth_map["signer"]["hash"],
49 signer_auth_map["signer"]["iteration"]),
50 signer_auth_map["signatures"])
51 except (ValueError, json.JSONDecodeError) as e:
52 raise ValueError('Unable to read Signer Authorization from "%s": %s' %
53 (path, str(e)))
55 @staticmethod
56 def for_signer_version(signer_version):
57 return SignerAuthorization(signer_version, [])
59 def __init__(self, signer_version, signatures):
60 self._signer_version = signer_version
61 self._signatures = signatures[:]
63 if type(self._signer_version) != SignerVersion:
64 raise ValueError(f"Invalid signer version given: {signer_version}")
66 if type(signatures) != list:
67 raise ValueError("Signatures must be a list")
69 for signature in signatures:
70 self._assert_signature_valid(signature)
72 @property
73 def signer_version(self):
74 return self._signer_version
76 @property
77 def signatures(self):
78 return self._signatures[:]
80 def add_signature(self, signature):
81 self._assert_signature_valid(signature)
82 self._signatures.append(signature)
84 def to_dict(self):
85 return {
86 "version": self.VERSION,
87 "signer": self._signer_version.to_dict(),
88 "signatures": self._signatures[:],
89 }
91 def save_to_jsonfile(self, path):
92 with open(path, "w") as file:
93 file.write("%s\n" % json.dumps(self.to_dict(), indent=2))
95 def _assert_signature_valid(self, signature):
96 try:
97 ec.PrivateKey().ecdsa_deserialize(bytes.fromhex(signature))
98 except Exception as e:
99 raise ValueError(f"Invalid DER signature: {signature}: {e}")
102class SignerVersion:
103 def __init__(self, hash, iteration):
104 if not(is_hex_string_of_length(hash, 32)):
105 raise ValueError("Hash must be a 32-byte hex string")
107 if type(iteration) == str:
108 iteration = hex_or_decimal_string_to_int(iteration)
110 if type(iteration) != int or iteration < 0 or iteration >= (2**16):
111 raise ValueError("Invalid iteration (must be a 16-bit unsigned int)")
113 self._hash = hash.lower()
114 self._iteration = iteration
116 @property
117 def hash(self):
118 return self._hash
120 @property
121 def iteration(self):
122 return self._iteration
124 @property
125 def msg(self):
126 return f"RSK_powHSM_signer_{self._hash}_iteration_{str(self._iteration)}"
128 def get_authorization_msg(self):
129 return encode_eth_message(self.msg)
131 def get_authorization_digest(self):
132 return sha3.keccak_256(self.get_authorization_msg()).digest()
134 def to_dict(self):
135 return {
136 "hash": self.hash,
137 "iteration": self.iteration,
138 }
140 def __repr__(self):
141 return f"SignerVersion(hash=0x{self.hash}, iteration={self.iteration})"