Coverage for admin/attestation_utils.py: 100%

66 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import hashlib 

24import json 

25import re 

26import secp256k1 as ec 

27import requests 

28from pathlib import Path 

29from comm.cstruct import CStruct 

30from .misc import AdminError 

31from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementX509 

32 

33 

34class PowHsmAttestationMessage(CStruct): 

35 """ 

36 pow_hsm_message_header 

37 

38 uint8_t platform 3 

39 uint8_t ud_value 32 

40 uint8_t public_keys_hash 32 

41 uint8_t best_block 32 

42 uint8_t last_signed_tx 8 

43 uint8_t timestamp 8 

44 """ 

45 

46 HEADER_REGEX = re.compile(b"^POWHSM:(5.[0-9])::") 

47 

48 @classmethod 

49 def is_header(cls, value): 

50 return cls.HEADER_REGEX.match(value) is not None 

51 

52 def __init__(self, value, offset=0, little=True, name="powHSM"): 

53 self.name = name 

54 # Parse header 

55 match = self.HEADER_REGEX.match(value) 

56 if match is None: 

57 raise ValueError( 

58 f"Invalid {self.name} attestation message header: {value.hex()}") 

59 

60 # Validate total length 

61 header_length = len(match.group(0)) 

62 expected_length = header_length + self.get_bytelength() 

63 if len(value[offset:]) != expected_length: 

64 raise ValueError(f"{self.name} attestation message length " 

65 f"mismatch: {value[offset:].hex()}") 

66 

67 # Grab version 

68 self.version = match.group(1).decode("ASCII") 

69 

70 # Parse the rest 

71 super().__init__(value, offset+header_length, little) 

72 

73 # Conversions 

74 self.platform = self.platform.decode("ASCII") 

75 self.timestamp = int.from_bytes(self.timestamp, byteorder="big", signed=False) 

76 

77 

78def load_pubkeys(pubkeys_file_path): 

79 # Load the given public keys file into a map 

80 try: 

81 with open(pubkeys_file_path, "r") as file: 

82 pubkeys_map = json.loads(file.read()) 

83 

84 if type(pubkeys_map) != dict: 

85 raise AdminError( 

86 "Public keys file must contain an object as a top level element") 

87 

88 result = {} 

89 for path in pubkeys_map.keys(): 

90 pubkey = pubkeys_map[path] 

91 try: 

92 pubkey = ec.PublicKey(bytes.fromhex(pubkey), raw=True) 

93 except Exception: 

94 raise AdminError(f"Invalid public key for path {path}: {pubkey}") 

95 result[path] = pubkey 

96 return result 

97 except (FileNotFoundError, ValueError, json.JSONDecodeError) as e: 

98 raise AdminError('Unable to read public keys from "%s": %s' % 

99 (pubkeys_file_path, str(e))) 

100 

101 

102def compute_pubkeys_hash(pubkeys_map): 

103 # Compute the given public keys hash 

104 # (sha256sum of the uncompressed public keys in 

105 # lexicographical path order) 

106 if len(pubkeys_map) == 0: 

107 raise AdminError("Can't compute the hash of an empty public keys map") 

108 

109 pubkeys_hash = hashlib.sha256() 

110 for path in sorted(pubkeys_map.keys()): 

111 pubkey = pubkeys_map[path] 

112 pubkeys_hash.update(pubkey.serialize(compressed=False)) 

113 return pubkeys_hash.digest() 

114 

115 

116def compute_pubkeys_output(pubkeys_map): 

117 pubkeys_output = [] 

118 path_name_padding = max(map(len, pubkeys_map.keys())) 

119 for path in sorted(pubkeys_map.keys()): 

120 pubkey = pubkeys_map[path] 

121 pubkeys_output.append( 

122 f"{(path + ':').ljust(path_name_padding+1)} " 

123 f"{pubkey.serialize(compressed=True).hex()}" 

124 ) 

125 return pubkeys_output 

126 

127 

128def get_root_of_trust(path): 

129 # From file 

130 if Path(path).is_file(): 

131 return HSMCertificateV2ElementX509.from_pemfile( 

132 path, 

133 HSMCertificateV2.ROOT_ELEMENT, 

134 HSMCertificateV2.ROOT_ELEMENT) 

135 

136 # Assume URL and try to grab it 

137 ra_res = requests.get(path) 

138 if ra_res.status_code != 200: 

139 raise RuntimeError(f"Error fetching root of trust from {path}") 

140 return HSMCertificateV2ElementX509.from_pem( 

141 ra_res.content.decode(), 

142 HSMCertificateV2.ROOT_ELEMENT, 

143 HSMCertificateV2.ROOT_ELEMENT)