Coverage for comm/pow.py: 22%

96 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import hashlib 

24import thirdparty.sha256 

25import logging 

26 

27_logger = logging.getLogger("pow") 

28 

29# Taken from rskj 

30# https://github.com/rsksmart/rskj/blob/32dbe675b2e22f1c804e2d1c8a76b37e2020f05b/rskj-core/src/main/java/co/rsk/util/DifficultyUtils.java#L32 # noqa E501 

31_MAX_TARGET = pow(2, 256) 

32 

33 

34def difficulty_to_target(difficulty): 

35 return _MAX_TARGET // max(3, difficulty) 

36 

37 

38# Given a BTC coinbase transaction, attempts to extract the RSK merge mining 

39# hash (if present). Otherwise it will raise a ValueErrors. 

40# Based on rskj. 

41# See https://github.com/rsksmart/rskj/blob/master/rskj-core/src/main/java/co/rsk/validators/ProofOfWorkRule.java#L162 # noqa E501 

42# for details. 

43# *** IMPORTANT ***: 

44# - tx_hex must be a string in hex format 

45# - the return value is a string in hex format 

46_MIDSTATE_SIZE = 52 

47_MIDSTATE_SIZE_TRIMMED = 40 

48_MIDSTATE_PREFIX_SIZE = 8 

49_MIDSTATE_SUFFIX_SIZE = 4 

50_RSK_TAG = b"RSKBLOCK:" 

51_MAX_RSK_TAG_POSITION = 64 

52_BLOCK_HEADER_HASH_SIZE = 32 

53_MAX_BYTES_AFTER_MERGE_MINING_HASH = 128 

54_BYTE_COUNT_LENGTH = 8 

55_MIN_COINBASE_TX_SIZE = 64 

56 

57 

58def coinbase_tx_extract_merge_mining_hash(tx_hex): 

59 try: 

60 tx = bytes.fromhex(tx_hex) 

61 tx_midstate = tx[:_MIDSTATE_SIZE] 

62 tx_tail = tx[_MIDSTATE_SIZE_TRIMMED:len(tx)] 

63 last_tag_pos = tx_tail.rfind(_RSK_TAG) 

64 

65 if last_tag_pos == -1: 

66 message = "Couldn't find RSK tag '%s' in tail '%s'" % ( 

67 _RSK_TAG.hex(), 

68 tx_tail.hex(), 

69 ) 

70 _logger.info(message) 

71 raise ValueError(message) 

72 

73 if last_tag_pos >= _MAX_RSK_TAG_POSITION: 

74 message = ( 

75 "RSK tag '%s' position in tail '%s' is bigger than expected (%d)" 

76 % (_RSK_TAG.hex(), tx_tail.hex(), _MAX_RSK_TAG_POSITION) 

77 ) 

78 _logger.info(message) 

79 raise ValueError(message) 

80 

81 expected_tag_size = len(_RSK_TAG) + _BLOCK_HEADER_HASH_SIZE 

82 if len(tx_tail[last_tag_pos:]) < expected_tag_size: 

83 message = ( 

84 "Last RSK tag '%s' found in tail '%s' is not long enough " 

85 "(expected at least %d bytes, got %d bytes)" 

86 % ( 

87 _RSK_TAG.hex(), 

88 tx_tail.hex(), 

89 expected_tag_size, 

90 len(tx_tail[last_tag_pos:]), 

91 ) 

92 ) 

93 _logger.info(message) 

94 raise ValueError(message) 

95 

96 remaining_tail = tx_tail[last_tag_pos + expected_tag_size:] 

97 

98 if len(remaining_tail) > _MAX_BYTES_AFTER_MERGE_MINING_HASH: 

99 message = "More than %d bytes after RSK tag" % ( 

100 _MAX_BYTES_AFTER_MERGE_MINING_HASH 

101 ) 

102 _logger.info(message) 

103 raise ValueError(message) 

104 

105 byte_count = int.from_bytes( 

106 tx_midstate[:_BYTE_COUNT_LENGTH], byteorder="big", signed=False 

107 ) 

108 coinbase_tx_length = byte_count + len(tx_tail) 

109 if coinbase_tx_length <= _MIN_COINBASE_TX_SIZE: 

110 message = ( 

111 "Coinbase transaction must be longer than %d bytes (got %d bytes)" 

112 % (_MIN_COINBASE_TX_SIZE, coinbase_tx_length) 

113 ) 

114 _logger.info(message) 

115 raise ValueError(message) 

116 

117 merged_mining_hash = tx_tail[ 

118 last_tag_pos 

119 + len(_RSK_TAG):last_tag_pos 

120 + len(_RSK_TAG) 

121 + _BLOCK_HEADER_HASH_SIZE 

122 ].hex() 

123 _logger.info("Found merge mining hash in coinbase TX: %s", merged_mining_hash) 

124 

125 return merged_mining_hash 

126 except Exception as e: 

127 message = "Can't extract merge mining hash from coinbase tx: %s" % str(e) 

128 _logger.info(message) 

129 raise ValueError(message) 

130 

131 

132def coinbase_tx_get_hash(tx_hex): 

133 try: 

134 tx = bytes.fromhex(tx_hex) 

135 tx_midstate = ( 

136 bytes([0] * _MIDSTATE_PREFIX_SIZE) 

137 + tx[:_MIDSTATE_SIZE_TRIMMED] 

138 + bytes([0] * _MIDSTATE_SUFFIX_SIZE) 

139 ) 

140 tx_tail = tx[_MIDSTATE_SIZE_TRIMMED:len(tx)] 

141 

142 hash_round1 = thirdparty.sha256.SHA256() 

143 hash_round1.set_midstate(tx_midstate) 

144 hash_round1.update(tx_tail) 

145 hash_round1 = hash_round1.digest() 

146 

147 coinbase_tx_hash = bytes(reversed(hashlib.sha256(hash_round1).digest())).hex() 

148 

149 _logger.info("Coinbase TX hash: %s", coinbase_tx_hash) 

150 

151 return coinbase_tx_hash 

152 except Exception as e: 

153 message = "Can't compute coinbase tx hash: %s" % str(e) 

154 _logger.info(message) 

155 raise ValueError(message) 

156 

157 

158# Given a merkle proof in the format described in RSKIP92 

159# (https://github.com/rsksmart/RSKIPs/blob/master/IPs/RSKIP92.md) 

160# this function returns True iif the given merkle proof is a valid 

161# proof of the coinbase tx for the given root hash 

162# *** IMPORTANT ***: all values are expected to be hex strings 

163_SHA256_HASH_LENGTH = 32 

164 

165 

166def is_valid_merkle_proof(merkle_proof_hex, root_hex, coinbase_tx_hash_hex): 

167 try: 

168 merkle_proof = bytes.fromhex(merkle_proof_hex) 

169 root = bytes.fromhex(root_hex) 

170 coinbase_tx_hash = bytes.fromhex(coinbase_tx_hash_hex) 

171 except Exception as e: 

172 _logger.info(str(e)) 

173 raise ValueError(str(e)) 

174 

175 # Verify merkle proof length 

176 if len(merkle_proof) % _SHA256_HASH_LENGTH != 0: 

177 message = "Merkle proof length is invalid (%d)" % len(merkle_proof) 

178 _logger.info(message) 

179 return False 

180 

181 # Extract merkle proof hashes 

182 hashes = [] 

183 for i in range(0, len(merkle_proof) // _SHA256_HASH_LENGTH): 

184 start = i * _SHA256_HASH_LENGTH 

185 hashes.append(merkle_proof[start:start + _SHA256_HASH_LENGTH]) 

186 

187 # Reduce 

188 current_left = coinbase_tx_hash 

189 for right in hashes: 

190 current_left = combine_left_right(current_left, right) 

191 

192 current_left = bytes(reversed(current_left)) 

193 

194 # We should have gotten to the root in case of a valid proof 

195 return root == current_left 

196 

197 

198# Combines two hashes (representing nodes in a merkle tree) to produce a single hash 

199# that would be the parent of these two nodes. 

200def combine_left_right(left, right): 

201 to_hash = bytes(reversed(left)) + bytes(reversed(right)) 

202 double_hash = hashlib.sha256(hashlib.sha256(to_hash).digest()).digest() 

203 return bytes(reversed(double_hash))