Coverage for admin/dongle_admin.py: 97%

115 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import os 

24import struct 

25from enum import IntEnum 

26import secp256k1 as ec 

27from ledgerblue.comm import getDongle 

28from ledgerblue.commException import CommException 

29 

30 

31class DongleAdminError(RuntimeError): 

32 pass 

33 

34 

35class DongleAdminTimeout(RuntimeError): 

36 @staticmethod 

37 def is_timeout(exc): 

38 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout": 

39 return True 

40 return False 

41 

42 

43# Dongle commands 

44class _Command(IntEnum): 

45 IDENTIFY = 0x04 

46 NONCE = 0x50 

47 SEND_KEY = 0x51 

48 GET_KEY = 0x52 

49 SETUP_ENDO = 0xC0 

50 SETUP_ENDO_ACK = 0xC2 

51 

52 

53class _SubCommand(IntEnum): 

54 SEND_KEY_MASTER = 0x00 

55 SEND_KEY_EPHEMERAL = 0x80 

56 GET_KEY_DEVICE = 0x00 

57 GET_KEY_EPHEMERAL = 0x80 

58 

59 

60class _Role(IntEnum): 

61 MASTER = 0x01 

62 DEVICE = 0x02 

63 EPHEMERAL = 0x11 

64 ENDORSEMENT = 0xFF 

65 

66 

67class _EndorsementScheme(IntEnum): 

68 SCHEME_ONE = 1 

69 SCHEME_TWO = 2 

70 

71 

72# Handles low-level communication with an powHSM dongle for 

73# some factory/legacy commands 

74class DongleAdmin: 

75 # APDU prefix 

76 CLA = 0xE0 

77 

78 TARGET_ID = bytes.fromhex("31100002") 

79 NONCE_LENGTH = 8 

80 

81 # Enumeration shorthands 

82 CMD = _Command 

83 SUBCMD = _SubCommand 

84 ROLE = _Role 

85 ENDORSEMENT_SCHEME = _EndorsementScheme 

86 

87 # Dongle exchange timeout 

88 DONGLE_TIMEOUT = 10 # seconds 

89 

90 def __init__(self, debug): 

91 self.debug = debug 

92 

93 # Send command to device 

94 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT): 

95 try: 

96 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data) 

97 result = self.dongle.exchange(cmd, timeout=timeout) 

98 except (CommException, BaseException) as e: 

99 # If this is a dongle timeout, raise a timeout exception 

100 if DongleAdminTimeout.is_timeout(e): 

101 raise DongleAdminTimeout() 

102 

103 # Otherwise, raise a standard error 

104 msg = "Error sending command: %s" % str(e) 

105 raise DongleAdminError(msg) 

106 return result 

107 

108 # Connect to the dongle 

109 def connect(self): 

110 try: 

111 self.dongle = getDongle(self.debug) 

112 except CommException as e: 

113 msg = "Error connecting: %s" % e.message 

114 raise DongleAdminError(msg) 

115 

116 # Disconnect from dongle 

117 def disconnect(self): 

118 try: 

119 if self.dongle and self.dongle.opened: 

120 self.dongle.close() 

121 except CommException as e: 

122 msg = "Error disconnecting: %s" % e.message 

123 raise DongleAdminError(msg) 

124 

125 def _ensure_connected(self): 

126 if self.dongle is None or not self.dongle.opened: 

127 raise DongleAdminError("Connect to dongle first") 

128 

129 # Handshake for an interaction 

130 # (optional master key can be given so that a 

131 # subsequent authorization from user is not needed) 

132 def handshake(self, master_key=None): 

133 self._ensure_connected() 

134 

135 # Identify 

136 self._send_command(self.CMD.IDENTIFY, 

137 bytes([0x00, 0x00, len(self.TARGET_ID)]) + self.TARGET_ID) 

138 

139 # Exchange nonces 

140 nonce = os.urandom(self.NONCE_LENGTH) 

141 response = self._send_command(self.CMD.NONCE, 

142 bytes([0x00, 0x00, self.NONCE_LENGTH]) + nonce) 

143 device_nonce = response[ 

144 4:12] # First 4 bytes are the device batch, we don't need it 

145 

146 # Inform master key 

147 if master_key is None: 

148 master_key = ec.PrivateKey() 

149 master_key_pub = master_key.pubkey.serialize(compressed=False) 

150 

151 to_sign = bytes([self.ROLE.MASTER]) + master_key_pub 

152 signature = master_key.ecdsa_serialize(master_key.ecdsa_sign(bytes(to_sign))) 

153 certificate = (bytes([len(master_key_pub)]) + master_key_pub + 

154 bytes([len(signature)]) + signature) 

155 self._send_command( 

156 self.CMD.SEND_KEY, 

157 bytes([self.SUBCMD.SEND_KEY_MASTER, 0x00, 

158 len(certificate)]) + certificate, 

159 ) 

160 

161 # Generate and inform ephemeral key 

162 ephemeral_key = ec.PrivateKey() 

163 ephemeral_key_pub = ephemeral_key.pubkey.serialize(compressed=False) 

164 to_sign = bytes([self.ROLE.EPHEMERAL]) + nonce + device_nonce + ephemeral_key_pub 

165 signature = master_key.ecdsa_serialize(master_key.ecdsa_sign(bytes(to_sign))) 

166 certificate = (bytes([len(ephemeral_key_pub)]) + ephemeral_key_pub + 

167 bytes([len(signature)]) + signature) 

168 self._send_command( 

169 self.CMD.SEND_KEY, 

170 bytes([self.SUBCMD.SEND_KEY_EPHEMERAL, 0x00, 

171 len(certificate)]) + certificate, 

172 ) 

173 

174 # Return the ephemeral key 

175 return ephemeral_key 

176 

177 # Get the device key alongside its issuer's certificate 

178 def get_device_key(self): 

179 self._ensure_connected() 

180 

181 response = self._send_command(self.CMD.GET_KEY, 

182 bytes([self.SUBCMD.GET_KEY_DEVICE, 0x00, 0x00])) 

183 

184 # Response has 3 components: certificate header, device public key and signature 

185 cert_header_length = response[0] 

186 cert_header = bytes(response[1:1 + cert_header_length]) 

187 response = response[1 + cert_header_length:] 

188 dev_key_pub_length = response[0] 

189 dev_key_pub = bytes(response[1:1 + dev_key_pub_length]) 

190 response = response[1 + dev_key_pub_length:] 

191 signature_length = response[0] 

192 signature = bytes(response[1:1 + signature_length]) 

193 

194 # Expected signed data is: key role, certificate header, device public key 

195 signed_data = bytes([self.ROLE.DEVICE]) + cert_header + dev_key_pub 

196 

197 # Don't really know whether this is needed, but request what 

198 # should be the device's ephemeral key (for this session?) 

199 # Anyway, just ignore the response 

200 self._send_command(self.CMD.GET_KEY, 

201 bytes([self.SUBCMD.GET_KEY_EPHEMERAL, 0x00, 0x00])) 

202 

203 # Return the raw device key along with the raw signature and raw signed data 

204 return { 

205 "pubkey": dev_key_pub.hex(), 

206 "message": signed_data.hex(), 

207 "signature": signature.hex(), 

208 } 

209 

210 def setup_endorsement_key(self, scheme, endorsement_certificate): 

211 self._ensure_connected() 

212 

213 if scheme not in [1, 2]: 

214 raise DongleAdminError(f"Invalid endorsement scheme {scheme}, must be 1 or 2") 

215 

216 response = self._send_command(self.CMD.SETUP_ENDO, bytes([scheme, 0x00, 0x00])) 

217 

218 endorsement_key_pub = bytes(response[:65]) 

219 signature = bytes(response[65:]) 

220 

221 # Expected signed data is: endorsement role + endorsement public key 

222 signed_data = bytes([self.ROLE.ENDORSEMENT]) + endorsement_key_pub 

223 

224 # Send endorsement certificate in order to confirm setup 

225 self._send_command( 

226 self.CMD.SETUP_ENDO_ACK, 

227 bytes([0x00, 0x00, len(endorsement_certificate)]) + endorsement_certificate, 

228 ) 

229 

230 # Return the raw endorsement public key along with the raw signature 

231 # and raw signed data 

232 return { 

233 "pubkey": endorsement_key_pub.hex(), 

234 "message": signed_data.hex(), 

235 "signature": signature.hex(), 

236 }