Coverage for comm/utils.py: 61%
72 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-07-10 13:43 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2021 RSK Labs Ltd
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of
6# this software and associated documentation files (the "Software"), to deal in
7# the Software without restriction, including without limitation the rights to
8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9# of the Software, and to permit persons to whom the Software is furnished to do
10# so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import re
24from Crypto.Hash import keccak
27def bitwise_and_bytes(bs1, bs2):
28 return bytes(x & y for (x, y) in zip(bs1, bs2))
31def assert_int(obj, key):
32 _assert_key_present(obj, key)
33 if type(obj[key]) != int:
34 raise ValueError("%s is not an integer" % _name_from_key(key))
37def assert_bool(obj, key):
38 _assert_key_present(obj, key)
39 if type(obj[key]) != bool:
40 raise ValueError("%s is not a boolean value" % _name_from_key(key))
43def assert_dict(obj, key):
44 _assert_key_present(obj, key)
45 if type(obj[key]) != dict:
46 raise ValueError("%s is not an object" % _name_from_key(key))
49def assert_hex_hash(obj, key):
50 _assert_key_present(obj, key)
51 if not is_hex_string_of_length(obj[key], 32):
52 raise ValueError(
53 "%s is not a hexadecimal value representing a block hash"
54 % _name_from_key(key)
55 )
58def _assert_keys_present(obj, keys):
59 for key in keys:
60 _assert_key_present(obj, key)
63def _assert_key_present(obj, key):
64 if key not in obj:
65 raise ValueError("%s not present" % _name_from_key(key))
68def _name_from_key(key):
69 name = key.split("_")
70 name[0] = name[0].capitalize()
71 name = " ".join(name)
72 return name
75def is_hex_string_of_length(value, num_bytes, allow_prefix=False):
76 try:
77 if allow_prefix and value.startswith("0x"):
78 value = value[2:]
79 bs = bytes.fromhex(value)
81 return len(bs) == num_bytes
82 except Exception:
83 return False
86def is_nonempty_hex_string(value):
87 try:
88 bs = bytes.fromhex(value)
89 return len(bs) > 0
90 except Exception:
91 return False
94def hex_or_decimal_string_to_int(value):
95 if value.startswith("0x"):
96 return int(value, 16)
98 return int(value, 10)
101def normalize_hex_string(value):
102 if value.startswith("0x"):
103 return value[2:]
105 return value
108def has_nonempty_hex_field(mp, name):
109 return name in mp and \
110 type(mp[name]) == str and \
111 is_nonempty_hex_string(mp[name])
114def has_hex_field_of_length(mp, name, length):
115 return name in mp and \
116 type(mp[name]) == str and \
117 is_hex_string_of_length(mp[name], length)
120def has_field_of_type(mp, name, tp):
121 return name in mp and \
122 type(mp[name]) == tp
125# Utility functions to parse and use a list slice
126# from a string (in the python fashion [nn:mm])
127_SLICE_REGEXP = re.compile("^(-?\\d*):(-?\\d*)$", re.ASCII)
130def is_slice_str(s):
131 return _SLICE_REGEXP.match(s) is not None
134def slice_from_str(s):
135 m = _SLICE_REGEXP.match(s)
137 if m is None:
138 raise ValueError(f'Invalid slice str "{s}"')
140 gs = m.groups()
141 start = 0 if gs[0] == "" else int(gs[0])
142 stop = None if gs[1] == "" else int(gs[1])
144 return slice(start, stop, 1)
147# One round Keccak-256
148def keccak_256(bs):
149 return keccak.new(digest_bits=256).update(bs).digest()