Coverage for tests/admin/test_dongle_admin.py: 100%

120 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23from unittest import TestCase 

24from unittest.mock import call, Mock, patch 

25from admin.dongle_admin import DongleAdmin, DongleAdminError, DongleAdminTimeout 

26import secp256k1 as ec 

27from ledgerblue.commException import CommException 

28 

29import os 

30import struct 

31 

32 

33class TestDongleAdmin(TestCase): 

34 RANDOM_MOCK = os.urandom(8) 

35 PRIVATE_KEY = ec.PrivateKey() 

36 GENERIC_ERROR_MSG = 'error-msg' 

37 

38 @patch("admin.dongle_admin.getDongle") 

39 def setUp(self, getDongleMock): 

40 self.dongle = Mock() 

41 self.getDongleMock = getDongleMock 

42 self.getDongleMock.return_value = self.dongle 

43 self.dongleAdmin = DongleAdmin(True) 

44 self.dongleAdmin.connect() 

45 

46 @patch("admin.dongle_admin.getDongle") 

47 def test_connect_error(self, getDongleMock): 

48 getDongleMock.side_effect = CommException(self.GENERIC_ERROR_MSG) 

49 with self.assertRaises(DongleAdminError) as e: 

50 self.dongleAdmin.connect() 

51 self.assertTrue(getDongleMock.called) 

52 self.assertEqual(f'Error connecting: {self.GENERIC_ERROR_MSG}', str(e.exception)) 

53 

54 def test_disconnect(self): 

55 self.dongle.close = Mock() 

56 self.dongleAdmin.disconnect() 

57 self.assertTrue(self.dongle.close.called) 

58 

59 def test_disconnect_error(self): 

60 self.dongle.close = Mock(side_effect=DongleAdminError(self.GENERIC_ERROR_MSG)) 

61 with self.assertRaises(DongleAdminError) as e: 

62 self.dongleAdmin.disconnect() 

63 self.assertTrue(self.dongle.close.called) 

64 self.assertEqual(self.GENERIC_ERROR_MSG, str(e.exception)) 

65 

66 @patch("secp256k1.PrivateKey", return_value=PRIVATE_KEY) 

67 @patch("os.urandom", return_value=RANDOM_MOCK) 

68 def test_handshake(self, *_): 

69 device_nonce = os.urandom(8) 

70 self.dongle.exchange = Mock(return_value=bytes.fromhex('00' * 4) + device_nonce) 

71 ephemeral_key = self.dongleAdmin.handshake() 

72 

73 exchange_calls = [] 

74 exchange_calls.append(call(bytes([ 

75 0xe0, 0x04, 0x00, 0x00, 0x04, 0x31, 0x10, 0x00, 0x02 

76 ]), timeout=10)) 

77 

78 nonce = self.RANDOM_MOCK 

79 exchange_calls.append(call(bytes([0xe0, 0x50, 0x00, 0x00, 0x08]) + nonce, 

80 timeout=10)) 

81 

82 pub_key = self.PRIVATE_KEY.pubkey.serialize(compressed=False) 

83 to_sign = bytes([0x01]) + pub_key 

84 signature = self.PRIVATE_KEY.ecdsa_serialize( 

85 self.PRIVATE_KEY.ecdsa_sign(bytes(to_sign))) 

86 certificate = (bytes([len(pub_key)]) + pub_key + 

87 bytes([len(signature)]) + signature) 

88 exchange_calls.append(call(bytes([0xe0, 0x51, 0x00, 0x00, len(certificate)]) + 

89 certificate, timeout=10)) 

90 

91 ephemeral_key_pub = ephemeral_key.pubkey.serialize(compressed=False) 

92 to_sign = (bytes([0x11]) + nonce + device_nonce + ephemeral_key_pub) 

93 signature = ephemeral_key.ecdsa_serialize( 

94 ephemeral_key.ecdsa_sign(bytes(to_sign))) 

95 certificate = (bytes([len(ephemeral_key_pub)]) + 

96 ephemeral_key_pub + bytes([len(signature)]) + signature) 

97 exchange_calls.append(call(bytes([0xe0, 0x51, 0x80, 0x00, len(certificate)]) + 

98 certificate, timeout=10)) 

99 

100 self.assertEqual(exchange_calls, self.dongle.exchange.call_args_list) 

101 self.assertEqual(self.PRIVATE_KEY, ephemeral_key) 

102 

103 def test_handshake_not_connected(self): 

104 self.dongle.opened = False 

105 with self.assertRaises(DongleAdminError) as e: 

106 self.dongleAdmin.handshake() 

107 self.assertEqual('Connect to dongle first', str(e.exception)) 

108 

109 def test_handshake_timeout(self): 

110 self.dongle.exchange = Mock(side_effect=CommException('Timeout')) 

111 with self.assertRaises(DongleAdminTimeout): 

112 self.dongleAdmin.handshake() 

113 self.assertTrue(self.dongle.exchange.called) 

114 

115 def test_get_device_key(self): 

116 nonce = bytes.fromhex('aa' * 8) 

117 cert_header = 'cert-header'.encode() 

118 priv_key = ec.PrivateKey() 

119 dev_pub_key = priv_key.pubkey.serialize(compressed=False) 

120 signature = priv_key.ecdsa_serialize(priv_key.ecdsa_sign('a-message'.encode())) 

121 

122 self.dongle.exchange = Mock(return_value=bytes(bytes([len(cert_header)]) + 

123 cert_header + 

124 bytes([len(dev_pub_key)]) + 

125 dev_pub_key + 

126 bytes([len(signature)]) + 

127 signature) + nonce) 

128 

129 expected_return = { 

130 "pubkey": dev_pub_key.hex(), 

131 "message": (bytes([0x02]) + cert_header + dev_pub_key).hex(), 

132 "signature": signature.hex(), 

133 } 

134 

135 self.assertEqual(expected_return, self.dongleAdmin.get_device_key()) 

136 exchange_calls = [] 

137 data = bytes([0x00, 0x00, 0x00]) 

138 exchange_calls.append( 

139 call( 

140 struct.pack("BB%ds" % len(data), 0xE0, 0x52, data), 

141 timeout=10 

142 ) 

143 ) 

144 data = bytes([0x80, 0x00, 0x00]) 

145 exchange_calls.append( 

146 call( 

147 struct.pack("BB%ds" % len(data), 0xE0, 0x52, data), 

148 timeout=10 

149 ) 

150 ) 

151 self.assertEqual(exchange_calls, self.dongle.exchange.call_args_list) 

152 

153 def test_get_device_key_timeout(self): 

154 self.dongle.exchange = Mock(side_effect=CommException('Timeout')) 

155 with self.assertRaises(DongleAdminTimeout): 

156 self.dongleAdmin.get_device_key() 

157 self.assertTrue(self.dongle.exchange.called) 

158 

159 def test_get_device_key_comm_error(self): 

160 self.dongle.exchange = Mock(side_effect=CommException(self.GENERIC_ERROR_MSG)) 

161 with self.assertRaises(DongleAdminError) as e: 

162 self.dongleAdmin.get_device_key() 

163 self.assertTrue(self.dongle.exchange.called) 

164 self.assertEqual('Error sending command: ' 

165 f'{str(CommException(self.GENERIC_ERROR_MSG))}', 

166 str(e.exception)) 

167 

168 def test_setup_endorsement_key(self): 

169 priv_key = ec.PrivateKey() 

170 scheme = 1 

171 endorsement_key_pub = priv_key.pubkey.serialize(compressed=False) 

172 signed_data = bytes([0xff]) + endorsement_key_pub 

173 signature = 'the-signature'.encode() 

174 certificate = 'the-certificate'.encode() 

175 

176 self.dongle.exchange = Mock() 

177 

178 # response for SETUP_ENDO command, we don't use the response of SETUP_ENDO_ACK 

179 self.dongle.exchange.return_value = bytes(endorsement_key_pub + signature) 

180 

181 self.assertEqual( 

182 { 

183 "pubkey": endorsement_key_pub.hex(), 

184 "message": signed_data.hex(), 

185 "signature": signature.hex(), 

186 }, 

187 self.dongleAdmin.setup_endorsement_key(scheme, certificate)) 

188 

189 exchange_calls = [] 

190 data = bytes([scheme, 0x00, 0x00]) 

191 exchange_calls.append( 

192 call( 

193 struct.pack("BB%ds" % len(data), 0xE0, 0xC0, data), 

194 timeout=10 

195 ) 

196 ) 

197 data = bytes([0x00, 0x00, len(certificate)]) + certificate 

198 exchange_calls.append( 

199 call( 

200 struct.pack("BB%ds" % len(data), 0xE0, 0xC2, data), 

201 timeout=10 

202 ) 

203 ) 

204 self.assertEqual(exchange_calls, self.dongle.exchange.call_args_list) 

205 

206 def test_setup_endorsement_key_timeout(self): 

207 self.dongle.exchange = Mock(side_effect=CommException('Timeout')) 

208 with self.assertRaises(DongleAdminTimeout): 

209 self.dongleAdmin.setup_endorsement_key(1, 'certificate'.encode()) 

210 self.assertTrue(self.dongle.exchange.called) 

211 

212 def test_setup_endorsement_key_comm_error(self): 

213 self.dongle.exchange = Mock(side_effect=CommException(self.GENERIC_ERROR_MSG)) 

214 with self.assertRaises(DongleAdminError) as e: 

215 self.dongleAdmin.setup_endorsement_key(1, 'certificate'.encode()) 

216 self.assertTrue(self.dongle.exchange.called) 

217 self.assertEqual('Error sending command: ' 

218 f'{str(CommException(self.GENERIC_ERROR_MSG))}', 

219 str(e.exception))