Coverage for admin/signer_authorization.py: 94%

70 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-05 20:41 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import json 

24import secp256k1 as ec 

25import sha3 

26from .utils import is_hex_string_of_length, hex_or_decimal_string_to_int 

27from .ledger_utils import encode_eth_message 

28 

29 

30class SignerAuthorization: 

31 VERSION = 1 # Only supported version 

32 

33 @staticmethod 

34 def from_jsonfile(path): 

35 try: 

36 with open(path, "r") as file: 

37 signer_auth_map = json.loads(file.read()) 

38 

39 if type(signer_auth_map) != dict: 

40 raise ValueError( 

41 "JSON file must contain an object as a top level element") 

42 

43 if signer_auth_map["version"] != SignerAuthorization.VERSION: 

44 raise ValueError("Unsupported file format version " 

45 f"{signer_auth_map['version']}") 

46 

47 return SignerAuthorization( 

48 SignerVersion(signer_auth_map["signer"]["hash"], 

49 signer_auth_map["signer"]["iteration"]), 

50 signer_auth_map["signatures"]) 

51 except (ValueError, json.JSONDecodeError) as e: 

52 raise ValueError('Unable to read Signer Authorization from "%s": %s' % 

53 (path, str(e))) 

54 

55 @staticmethod 

56 def for_signer_version(signer_version): 

57 return SignerAuthorization(signer_version, []) 

58 

59 def __init__(self, signer_version, signatures): 

60 self._signer_version = signer_version 

61 self._signatures = signatures[:] 

62 

63 if type(self._signer_version) != SignerVersion: 

64 raise ValueError(f"Invalid signer version given: {signer_version}") 

65 

66 if type(signatures) != list: 

67 raise ValueError("Signatures must be a list") 

68 

69 for signature in signatures: 

70 self._assert_signature_valid(signature) 

71 

72 @property 

73 def signer_version(self): 

74 return self._signer_version 

75 

76 @property 

77 def signatures(self): 

78 return self._signatures[:] 

79 

80 def add_signature(self, signature): 

81 self._assert_signature_valid(signature) 

82 self._signatures.append(signature) 

83 

84 def to_dict(self): 

85 return { 

86 "version": self.VERSION, 

87 "signer": self._signer_version.to_dict(), 

88 "signatures": self._signatures[:], 

89 } 

90 

91 def save_to_jsonfile(self, path): 

92 with open(path, "w") as file: 

93 file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) 

94 

95 def _assert_signature_valid(self, signature): 

96 try: 

97 ec.PrivateKey().ecdsa_deserialize(bytes.fromhex(signature)) 

98 except Exception as e: 

99 raise ValueError(f"Invalid DER signature: {signature}: {e}") 

100 

101 

102class SignerVersion: 

103 def __init__(self, hash, iteration): 

104 if not(is_hex_string_of_length(hash, 32)): 

105 raise ValueError("Hash must be a 32-byte hex string") 

106 

107 if type(iteration) == str: 

108 iteration = hex_or_decimal_string_to_int(iteration) 

109 

110 if type(iteration) != int or iteration < 0 or iteration >= (2**16): 

111 raise ValueError("Invalid iteration (must be a 16-bit unsigned int)") 

112 

113 self._hash = hash.lower() 

114 self._iteration = iteration 

115 

116 @property 

117 def hash(self): 

118 return self._hash 

119 

120 @property 

121 def iteration(self): 

122 return self._iteration 

123 

124 @property 

125 def msg(self): 

126 return f"RSK_powHSM_signer_{self._hash}_iteration_{str(self._iteration)}" 

127 

128 def get_authorization_msg(self): 

129 return encode_eth_message(self.msg) 

130 

131 def get_authorization_digest(self): 

132 return sha3.keccak_256(self.get_authorization_msg()).digest() 

133 

134 def to_dict(self): 

135 return { 

136 "hash": self.hash, 

137 "iteration": self.iteration, 

138 } 

139 

140 def __repr__(self): 

141 return f"SignerVersion(hash=0x{self.hash}, iteration={self.iteration})"