Coverage for admin/dongle_admin.py: 97%
115 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import os
24import struct
25from enum import IntEnum
26import secp256k1 as ec
27from ledgerblue.comm import getDongle
28from ledgerblue.commException import CommException
31class DongleAdminError(RuntimeError):
32 pass
35class DongleAdminTimeout(RuntimeError):
36 @staticmethod
37 def is_timeout(exc):
38 if type(exc) == CommException and exc.sw == 0x6F00 and exc.message == "Timeout":
39 return True
40 return False
43# Dongle commands
44class _Command(IntEnum):
45 IDENTIFY = 0x04
46 NONCE = 0x50
47 SEND_KEY = 0x51
48 GET_KEY = 0x52
49 SETUP_ENDO = 0xC0
50 SETUP_ENDO_ACK = 0xC2
53class _SubCommand(IntEnum):
54 SEND_KEY_MASTER = 0x00
55 SEND_KEY_EPHEMERAL = 0x80
56 GET_KEY_DEVICE = 0x00
57 GET_KEY_EPHEMERAL = 0x80
60class _Role(IntEnum):
61 MASTER = 0x01
62 DEVICE = 0x02
63 EPHEMERAL = 0x11
64 ENDORSEMENT = 0xFF
67class _EndorsementScheme(IntEnum):
68 SCHEME_ONE = 1
69 SCHEME_TWO = 2
72# Handles low-level communication with an powHSM dongle for
73# some factory/legacy commands
74class DongleAdmin:
75 # APDU prefix
76 CLA = 0xE0
78 TARGET_ID = bytes.fromhex("31100002")
79 NONCE_LENGTH = 8
81 # Enumeration shorthands
82 CMD = _Command
83 SUBCMD = _SubCommand
84 ROLE = _Role
85 ENDORSEMENT_SCHEME = _EndorsementScheme
87 # Dongle exchange timeout
88 DONGLE_TIMEOUT = 10 # seconds
90 def __init__(self, debug):
91 self.debug = debug
93 # Send command to device
94 def _send_command(self, command, data=b"", timeout=DONGLE_TIMEOUT):
95 try:
96 cmd = struct.pack("BB%ds" % len(data), self.CLA, command, data)
97 result = self.dongle.exchange(cmd, timeout=timeout)
98 except (CommException, BaseException) as e:
99 # If this is a dongle timeout, raise a timeout exception
100 if DongleAdminTimeout.is_timeout(e):
101 raise DongleAdminTimeout()
103 # Otherwise, raise a standard error
104 msg = "Error sending command: %s" % str(e)
105 raise DongleAdminError(msg)
106 return result
108 # Connect to the dongle
109 def connect(self):
110 try:
111 self.dongle = getDongle(self.debug)
112 except CommException as e:
113 msg = "Error connecting: %s" % e.message
114 raise DongleAdminError(msg)
116 # Disconnect from dongle
117 def disconnect(self):
118 try:
119 if self.dongle and self.dongle.opened:
120 self.dongle.close()
121 except CommException as e:
122 msg = "Error disconnecting: %s" % e.message
123 raise DongleAdminError(msg)
125 def _ensure_connected(self):
126 if self.dongle is None or not self.dongle.opened:
127 raise DongleAdminError("Connect to dongle first")
129 # Handshake for an interaction
130 # (optional master key can be given so that a
131 # subsequent authorization from user is not needed)
132 def handshake(self, master_key=None):
133 self._ensure_connected()
135 # Identify
136 self._send_command(self.CMD.IDENTIFY,
137 bytes([0x00, 0x00, len(self.TARGET_ID)]) + self.TARGET_ID)
139 # Exchange nonces
140 nonce = os.urandom(self.NONCE_LENGTH)
141 response = self._send_command(self.CMD.NONCE,
142 bytes([0x00, 0x00, self.NONCE_LENGTH]) + nonce)
143 device_nonce = response[
144 4:12] # First 4 bytes are the device batch, we don't need it
146 # Inform master key
147 if master_key is None:
148 master_key = ec.PrivateKey()
149 master_key_pub = master_key.pubkey.serialize(compressed=False)
151 to_sign = bytes([self.ROLE.MASTER]) + master_key_pub
152 signature = master_key.ecdsa_serialize(master_key.ecdsa_sign(bytes(to_sign)))
153 certificate = (bytes([len(master_key_pub)]) + master_key_pub +
154 bytes([len(signature)]) + signature)
155 self._send_command(
156 self.CMD.SEND_KEY,
157 bytes([self.SUBCMD.SEND_KEY_MASTER, 0x00,
158 len(certificate)]) + certificate,
159 )
161 # Generate and inform ephemeral key
162 ephemeral_key = ec.PrivateKey()
163 ephemeral_key_pub = ephemeral_key.pubkey.serialize(compressed=False)
164 to_sign = bytes([self.ROLE.EPHEMERAL]) + nonce + device_nonce + ephemeral_key_pub
165 signature = master_key.ecdsa_serialize(master_key.ecdsa_sign(bytes(to_sign)))
166 certificate = (bytes([len(ephemeral_key_pub)]) + ephemeral_key_pub +
167 bytes([len(signature)]) + signature)
168 self._send_command(
169 self.CMD.SEND_KEY,
170 bytes([self.SUBCMD.SEND_KEY_EPHEMERAL, 0x00,
171 len(certificate)]) + certificate,
172 )
174 # Return the ephemeral key
175 return ephemeral_key
177 # Get the device key alongside its issuer's certificate
178 def get_device_key(self):
179 self._ensure_connected()
181 response = self._send_command(self.CMD.GET_KEY,
182 bytes([self.SUBCMD.GET_KEY_DEVICE, 0x00, 0x00]))
184 # Response has 3 components: certificate header, device public key and signature
185 cert_header_length = response[0]
186 cert_header = bytes(response[1:1 + cert_header_length])
187 response = response[1 + cert_header_length:]
188 dev_key_pub_length = response[0]
189 dev_key_pub = bytes(response[1:1 + dev_key_pub_length])
190 response = response[1 + dev_key_pub_length:]
191 signature_length = response[0]
192 signature = bytes(response[1:1 + signature_length])
194 # Expected signed data is: key role, certificate header, device public key
195 signed_data = bytes([self.ROLE.DEVICE]) + cert_header + dev_key_pub
197 # Don't really know whether this is needed, but request what
198 # should be the device's ephemeral key (for this session?)
199 # Anyway, just ignore the response
200 self._send_command(self.CMD.GET_KEY,
201 bytes([self.SUBCMD.GET_KEY_EPHEMERAL, 0x00, 0x00]))
203 # Return the raw device key along with the raw signature and raw signed data
204 return {
205 "pubkey": dev_key_pub.hex(),
206 "message": signed_data.hex(),
207 "signature": signature.hex(),
208 }
210 def setup_endorsement_key(self, scheme, endorsement_certificate):
211 self._ensure_connected()
213 if scheme not in [1, 2]:
214 raise DongleAdminError(f"Invalid endorsement scheme {scheme}, must be 1 or 2")
216 response = self._send_command(self.CMD.SETUP_ENDO, bytes([scheme, 0x00, 0x00]))
218 endorsement_key_pub = bytes(response[:65])
219 signature = bytes(response[65:])
221 # Expected signed data is: endorsement role + endorsement public key
222 signed_data = bytes([self.ROLE.ENDORSEMENT]) + endorsement_key_pub
224 # Send endorsement certificate in order to confirm setup
225 self._send_command(
226 self.CMD.SETUP_ENDO_ACK,
227 bytes([0x00, 0x00, len(endorsement_certificate)]) + endorsement_certificate,
228 )
230 # Return the raw endorsement public key along with the raw signature
231 # and raw signed data
232 return {
233 "pubkey": endorsement_key_pub.hex(),
234 "message": signed_data.hex(),
235 "signature": signature.hex(),
236 }