Coverage for sgx/hsm2dongle.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from enum import IntEnum 

24from ledger.hsm2dongle import HSM2DongleError 

25from ledger.hsm2dongle_tcp import HSM2DongleTCP 

26 

27 

28class SgxCommand(IntEnum): 

29 SGX_ONBOARD = 0xA0, 

30 SGX_RETRIES = 0xA2, 

31 SGX_UNLOCK = 0xA3, 

32 SGX_ECHO = 0xA4, 

33 SGX_CHANGE_PASSWORD = 0xA5, 

34 SGX_UPGRADE = 0xA6, 

35 

36 

37class SgxUpgradeOps(IntEnum): 

38 START = 0x01, 

39 SPEC_SIG = 0x02, 

40 IDENTIFY_SELF = 0x03, 

41 IDENTIFY_PEER = 0x04, 

42 PROCESS_DATA = 0x05, 

43 

44 

45class SgxUpgradeRoles(IntEnum): 

46 EXPORTER = 0x01, 

47 IMPORTER = 0x02, 

48 

49 

50EVIDENCE_LEN_BYTES = 2 

51EVIDENCE_CHUNK_SIZE = 80 

52 

53 

54class HSM2DongleSGX(HSM2DongleTCP): 

55 # Echo message 

56 def echo(self): 

57 message = bytes([0x41, 0x42, 0x43]) 

58 result = bytes(self._send_command(SgxCommand.SGX_ECHO, message)) 

59 # Result should be the command plus the message 

60 expected_result = bytes([self.CLA, SgxCommand.SGX_ECHO]) + message 

61 return result == expected_result 

62 

63 # Unlock the device with the given pin 

64 def unlock(self, pin): 

65 response = self._send_command(SgxCommand.SGX_UNLOCK, bytes([0]) + pin) 

66 

67 # Nonzero indicates device unlocked 

68 return response[2] != 0 

69 

70 # change pin 

71 def new_pin(self, pin): 

72 response = self._send_command(SgxCommand.SGX_CHANGE_PASSWORD, bytes([0]) + pin) 

73 

74 # One indicates pin changed 

75 return response[2] == 1 

76 

77 # returns the number of pin retries available 

78 def get_retries(self): 

79 apdu_rcv = self._send_command(SgxCommand.SGX_RETRIES) 

80 return apdu_rcv[2] 

81 

82 # Attempt to onboard the device using the given seed and pin 

83 def onboard(self, seed, pin): 

84 if type(seed) != bytes or len(seed) != self.ONBOARDING.SEED_LENGTH: 

85 raise HSM2DongleError("Invalid seed given") 

86 

87 if type(pin) != bytes: 

88 raise HSM2DongleError("Invalid pin given") 

89 

90 self.logger.info("Sending onboard command") 

91 response = self._send_command(SgxCommand.SGX_ONBOARD, bytes([0x0]) + seed + pin) 

92 

93 if response[2] != 1: 

94 raise HSM2DongleError("Error onboarding. Got '%s'" % response.hex()) 

95 

96 return True 

97 

98 # Migration operations 

99 def migrate_db_spec(self, role, source_mre, destination_mre, signatures): 

100 # Send spec and role 

101 self._send_command( 

102 SgxCommand.SGX_UPGRADE, 

103 bytes([SgxUpgradeOps.START]) + 

104 bytes([role]) + source_mre + destination_mre) 

105 

106 # Send signatures 

107 for signature in signatures: 

108 response = self._send_command( 

109 SgxCommand.SGX_UPGRADE, 

110 bytes([SgxUpgradeOps.SPEC_SIG]) + signature) 

111 

112 if response[2] == 0: 

113 break 

114 

115 if response[2] != 0: 

116 raise HSM2DongleError("Not enough correct signatures gathered") 

117 

118 def migrate_db_get_evidence(self): 

119 evidence = bytes([]) 

120 while True: 

121 response = self._send_command( 

122 SgxCommand.SGX_UPGRADE, 

123 bytes([SgxUpgradeOps.IDENTIFY_SELF])) 

124 

125 evidence += response[3:] 

126 

127 if response[2] == 0: 

128 break 

129 

130 return evidence 

131 

132 def migrate_db_send_evidence(self, evidence): 

133 evlen = len(evidence).to_bytes( 

134 EVIDENCE_LEN_BYTES, byteorder="big", signed=False) 

135 offset = 0 

136 while True: 

137 response = self._send_command( 

138 SgxCommand.SGX_UPGRADE, 

139 bytes([SgxUpgradeOps.IDENTIFY_PEER]) + 

140 (evlen if offset == 0 else bytes([])) + 

141 evidence[offset:offset+EVIDENCE_CHUNK_SIZE]) 

142 offset += EVIDENCE_CHUNK_SIZE 

143 if response[2] == 0 or offset >= len(evidence): 

144 break 

145 

146 if response[2] != 0: 

147 raise HSM2DongleError("Failed to receive evidence ack") 

148 

149 def migrate_db_get_data(self): 

150 data = self._send_command( 

151 SgxCommand.SGX_UPGRADE, 

152 bytes([SgxUpgradeOps.PROCESS_DATA]))[3:] 

153 

154 if len(data) == 0: 

155 raise HSM2DongleError("Migration data gathering failed." 

156 "Expected data but got 0 bytes instead") 

157 

158 return data 

159 

160 def migrate_db_send_data(self, data): 

161 self._send_command( 

162 SgxCommand.SGX_UPGRADE, 

163 bytes([SgxUpgradeOps.PROCESS_DATA]) + data) 

164 

165 # Map from standard commands to SGX-specific commands 

166 SGX_SPECIFIC_COMMANDS = [ 

167 HSM2DongleTCP.CMD.ADVANCE, HSM2DongleTCP.CMD.UPD_ANCESTOR 

168 ] 

169 

170 # Send a specific piece of data in chunks to the device 

171 # as the device requests bytes from it. 

172 # Validate responses wrt current operation and next possible expected operations 

173 # Exceptions are to be handled by the caller 

174 def _send_data_in_chunks( 

175 self, 

176 command, 

177 operation, 

178 next_operations, 

179 data, 

180 expect_full_data, 

181 initial_bytes, 

182 operation_name, 

183 data_description, 

184 ): 

185 # Same old behavior for anything that hasn't got an SGX-specific 

186 # mapping 

187 if command not in self.SGX_SPECIFIC_COMMANDS: 

188 return super()._send_data_in_chunks( 

189 command, 

190 operation, 

191 next_operations, 

192 data, expect_full_data, 

193 initial_bytes, 

194 operation_name, 

195 data_description) 

196 

197 # Send data in full 

198 response = self._send_command(command, bytes([operation]) + data) 

199 

200 # We expect the device to ask for one of the next operations but 

201 # not the current chunk operation 

202 # If it doesn't happen, error out 

203 if response[self.OFF.OP] not in next_operations or \ 

204 response[self.OFF.OP] == operation: 

205 self.logger.debug( 

206 "Current operation %s, next operations %s, ledger requesting %s", 

207 hex(operation), 

208 str(list(map(hex, next_operations))), 

209 hex(response[2]), 

210 ) 

211 self.logger.error( 

212 "%s: unexpected response %s", 

213 operation_name.capitalize(), 

214 response.hex(), 

215 ) 

216 return (False, response) 

217 

218 # All is good 

219 return (True, response)