Coverage for comm/cstruct.py: 98%

107 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-07-10 13:43 +0000

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2021 RSK Labs Ltd 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of 

6# this software and associated documentation files (the "Software"), to deal in 

7# the Software without restriction, including without limitation the rights to 

8# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 

9# of the Software, and to permit persons to whom the Software is furnished to do 

10# so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23 

24import struct 

25import re 

26 

27 

28class CStruct: 

29 MAP = { 

30 "uint8_t": ["B", "s"], 

31 "uint16_t": "H", 

32 "uint32_t": "I", 

33 "uint64_t": "Q", 

34 } 

35 SPEC = None 

36 TYPENAME = None 

37 

38 @classmethod 

39 def _spec(cls, little=True): 

40 if cls.SPEC is None or little not in cls.SPEC: 

41 fmt = "<" if little else ">" 

42 atrmap = {} 

43 names = [] 

44 types = [] 

45 index = 0 

46 typename = None 

47 for line in cls.__doc__.split("\n")[1:]: 

48 line = re.sub(r"\s+", " ", line.strip()) 

49 if line == "": 

50 continue 

51 if typename is None: 

52 typename = line 

53 continue 

54 tspec = line.split(" ") 

55 

56 length = "" if len(tspec) < 3 else str(int(tspec[2].strip(), 10)) 

57 

58 typ = tspec[0].strip() 

59 actual_type = None 

60 derived_type = None 

61 if typ not in cls.MAP.keys(): 

62 for kls in cls.__base__.__subclasses__(): 

63 if cls != kls: 

64 if typ == kls._typename(): 

65 actual_type = kls 

66 derived_type = kls 

67 if derived_type is None: 

68 raise ValueError(f"Invalid type: {typ}") 

69 else: 

70 actual_type = cls.MAP[typ] 

71 

72 if length != "" and not isinstance(actual_type, list): 

73 raise ValueError(f"Invalid type spec: {line}") 

74 

75 name = tspec[1].strip() 

76 

77 if isinstance(actual_type, list): 

78 actual_type = actual_type[0] if length == "" else actual_type[1] 

79 elif not isinstance(actual_type, str) and \ 

80 issubclass(actual_type, cls.__base__): 

81 actual_type = str(actual_type.get_bytelength(little)) + "s" 

82 

83 fmt += length + actual_type 

84 names.append(name) 

85 types.append(derived_type) 

86 atrmap[name] = index 

87 index += 1 

88 if cls.SPEC is None: 

89 cls.SPEC = {} 

90 cls.SPEC[little] = (struct.Struct(fmt), atrmap, names, types, typename) 

91 

92 return cls.SPEC[little] 

93 

94 @classmethod 

95 def _struct(cls, little=True): 

96 return cls._spec(little)[0] 

97 

98 @classmethod 

99 def _atrmap(cls, little=True): 

100 return cls._spec(little)[1] 

101 

102 @classmethod 

103 def _names(cls, little=True): 

104 return cls._spec(little)[2] 

105 

106 @classmethod 

107 def _types(cls, little=True): 

108 return cls._spec(little)[3] 

109 

110 @classmethod 

111 def _typename(cls): 

112 if cls.TYPENAME is None: 

113 for line in cls.__doc__.split("\n"): 

114 line = re.sub(r"\s+", " ", line.strip()) 

115 if line == "": 

116 continue 

117 cls.TYPENAME = line 

118 break 

119 

120 return cls.TYPENAME 

121 

122 @classmethod 

123 def get_bytelength(cls, little=True): 

124 return cls._struct(little).size 

125 

126 def __init__(self, value, offset=0, little=True): 

127 self._offset = offset 

128 self._little = little 

129 self._raw_value = value 

130 

131 try: 

132 self._parsed = list(self._struct(little).unpack_from(value, offset)) 

133 except Exception as e: 

134 raise ValueError(f"While parsing: {e}") 

135 

136 for index, derived_type in enumerate(self._types(little)): 

137 if derived_type is not None: 

138 self._parsed[index] = derived_type(self._parsed[index], little=little) 

139 

140 def _value(self, name): 

141 amap = self._atrmap(self._little) 

142 if name in amap: 

143 return self._parsed[amap[name]] 

144 raise NameError(f"Property {name} does not exist") 

145 

146 def __getattr__(self, name): 

147 return self._value(name) 

148 

149 def get_raw_data(self): 

150 return self._raw_value[ 

151 self._offset:self._offset+self.get_bytelength(self._little)] 

152 

153 def to_dict(self): 

154 result = {} 

155 for name in self._names(self._little): 

156 value = self._value(name) 

157 if isinstance(value, bytes): 

158 value = value.hex() 

159 result[name] = value.to_dict() if isinstance(value, CStruct) else value 

160 return result 

161 

162 def __repr__(self): 

163 return f"<{self.__class__.__name__}: {self.to_dict()}>"