Coverage for admin/signer_authorization.py: 95%

76 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import secp256k1 as ec 

25from .utils import is_hex_string_of_length, hex_or_decimal_string_to_int, keccak_256 

26from .ledger_utils import encode_eth_message 

27 

28 

29class SignerAuthorization: 

30 VERSION = 1 # Only supported version 

31 

32 @staticmethod 

33 def from_jsonfile(path): 

34 try: 

35 with open(path, "r") as file: 

36 signer_auth_map = json.loads(file.read()) 

37 

38 if type(signer_auth_map) != dict: 

39 raise ValueError( 

40 "JSON file must contain an object as a top level element") 

41 

42 if signer_auth_map["version"] != SignerAuthorization.VERSION: 

43 raise ValueError("Unsupported file format version " 

44 f"{signer_auth_map['version']}") 

45 

46 return SignerAuthorization( 

47 SignerVersion(signer_auth_map["signer"]["hash"], 

48 signer_auth_map["signer"]["iteration"]), 

49 signer_auth_map["signatures"]) 

50 except (ValueError, json.JSONDecodeError) as e: 

51 raise ValueError('Unable to read Signer Authorization from "%s": %s' % 

52 (path, str(e))) 

53 

54 @staticmethod 

55 def for_signer_version(signer_version): 

56 return SignerAuthorization(signer_version, []) 

57 

58 def __init__(self, signer_version, signatures): 

59 self._signer_version = signer_version 

60 self._signatures = signatures[:] 

61 

62 if type(self._signer_version) != SignerVersion: 

63 raise ValueError(f"Invalid signer version given: {signer_version}") 

64 

65 if type(signatures) != list: 

66 raise ValueError("Signatures must be a list") 

67 

68 for signature in signatures: 

69 self._assert_signature_valid(signature) 

70 

71 @property 

72 def signer_version(self): 

73 return self._signer_version 

74 

75 @property 

76 def signatures(self): 

77 return self._signatures[:] 

78 

79 def add_signature(self, signature): 

80 self._assert_signature_valid(signature) 

81 self._signatures.append(signature) 

82 

83 def to_dict(self): 

84 return { 

85 "version": self.VERSION, 

86 "signer": self._signer_version.to_dict(), 

87 "signatures": self._signatures[:], 

88 } 

89 

90 def save_to_jsonfile(self, path): 

91 with open(path, "w") as file: 

92 file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) 

93 

94 def _assert_signature_valid(self, signature): 

95 try: 

96 ec.PrivateKey().ecdsa_deserialize(bytes.fromhex(signature)) 

97 except Exception as e: 

98 raise ValueError(f"Invalid DER signature: {signature}: {e}") 

99 

100 

101class SignerVersion: 

102 def __init__(self, hash, iteration): 

103 if not is_hex_string_of_length(hash, 32): 

104 raise ValueError("Hash must be a 32-byte hex string") 

105 

106 if type(iteration) == str: 

107 iteration = hex_or_decimal_string_to_int(iteration) 

108 

109 if type(iteration) != int or iteration < 0 or iteration >= (2**16): 

110 raise ValueError("Invalid iteration (must be a 16-bit unsigned int)") 

111 

112 self._hash = hash.lower() 

113 self._iteration = iteration 

114 

115 @property 

116 def hash(self): 

117 return self._hash 

118 

119 @property 

120 def iteration(self): 

121 return self._iteration 

122 

123 @property 

124 def msg(self): 

125 return f"RSK_powHSM_signer_{self._hash}_iteration_{str(self._iteration)}" 

126 

127 def get_authorization_msg(self): 

128 return encode_eth_message(self.msg) 

129 

130 def get_authorization_digest(self): 

131 return keccak_256(self.get_authorization_msg()) 

132 

133 def to_dict(self): 

134 return { 

135 "hash": self.hash, 

136 "iteration": self.iteration, 

137 } 

138 

139 def __repr__(self): 

140 return f"SignerVersion(hash=0x{self.hash}, iteration={self.iteration})"