protocol.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import struct
  2. from dataclasses import dataclass
  3. from typing import Any, List, Union, Tuple, Optional
  4. from .variables import variables, Variable, FunctionCodes
  5. from .crc import crc16
  6. type Value = str|int|float
  7. @dataclass
  8. class Result(Variable):
  9. value: Value
  10. class ResultContainer:
  11. def __init__(self, results: List[Result]):
  12. self._results = results
  13. self._result_map = {res.name: res for res in results}
  14. def __getitem__(self, key: Union[int, str, slice]) -> Result:
  15. if isinstance(key, int):
  16. return self._results[key]
  17. elif isinstance(key, str):
  18. return self._result_map[key]
  19. elif isinstance(key, slice):
  20. return ResultContainer(self._results[key])
  21. else:
  22. raise TypeError("Key must be an integer index or a result name string.")
  23. def __len__(self):
  24. return len(self._results)
  25. def __iter__(self):
  26. return iter(self._results)
  27. def __add__(self, other) -> list[Result]:
  28. return ResultContainer(self._results + other._results)
  29. def __bool__(self):
  30. return len(self._results) > 0
  31. def items(self):
  32. return self._result_map.items()
  33. def get(self, key: str) -> Optional[Result]:
  34. if isinstance(key, str):
  35. return self._result_map.get(key)
  36. else:
  37. raise TypeError("Key must be a variable name string.")
  38. class LumiaxClient:
  39. def __init__(self):
  40. self.device_id = 0xFE
  41. def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int) -> Value:
  42. if variable.is_32_bit and variable.is_signed:
  43. raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">h", buffer, offset + 2)[0] << 16
  44. elif variable.is_32_bit:
  45. raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">H", buffer, offset + 2)[0] << 16
  46. elif variable.is_signed:
  47. raw_value = struct.unpack_from(">h", buffer, offset)[0]
  48. else:
  49. raw_value = struct.unpack_from(">H", buffer, offset)[0]
  50. if variable.multiplier:
  51. value = raw_value / variable.multiplier
  52. elif variable.func:
  53. try:
  54. value = variable.func(raw_value)
  55. except IndexError as e:
  56. raise Exception(f"unexpected value for {variable.name} ({hex(variable.address)}): '{raw_value}'")
  57. else:
  58. value = raw_value
  59. return value
  60. def value_to_bytes(self, variable: Variable, buffer: bytearray, offset: int, value: Value) -> int:
  61. if variable.multiplier and not variable.func:
  62. raw_value = round(float(value) * variable.multiplier)
  63. elif variable.func:
  64. raw_value = self._find_raw_value_by_brute_force(variable, value)
  65. if raw_value == None:
  66. raise Exception(f"invalid value for {variable.name}: '{value}'")
  67. elif variable.binary_payload and value == variable.binary_payload[0]:
  68. raw_value = 0xFF00
  69. elif variable.binary_payload and value == variable.binary_payload[1]:
  70. raw_value = 0
  71. elif variable.binary_payload:
  72. raise Exception(f"invalid binary value for {variable.name}: '{value}'")
  73. else:
  74. raw_value = int(value)
  75. if variable.is_32_bit and variable.is_signed:
  76. struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
  77. struct.pack_into(">h", buffer, offset + 2, raw_value >> 16)
  78. elif variable.is_32_bit:
  79. struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
  80. struct.pack_into(">H", buffer, offset + 2, raw_value >> 16)
  81. elif variable.is_signed:
  82. struct.pack_into(">h", buffer, offset, raw_value)
  83. else:
  84. struct.pack_into(">H", buffer, offset, raw_value)
  85. length = 4 if variable.is_32_bit else 2
  86. return offset + length
  87. def get_read_command(self, device_id: int, start_address: int, count: int) -> bytes:
  88. items = [v for v in variables if v.address >= start_address and v.address < start_address + count]
  89. if not items:
  90. raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} contains no variables")
  91. function_code = items[0].function_codes[0]
  92. if not all(function_code in v.function_codes for v in items):
  93. raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} spans multiple function codes")
  94. result = bytes([
  95. device_id,
  96. function_code,
  97. start_address >> 8,
  98. start_address & 0xFF,
  99. count >> 8,
  100. count & 0xFF
  101. ])
  102. return result + crc16(result)
  103. def get_write_command(self, device_id: int, results: list[Result]) -> Tuple[int, bytes]:
  104. if not results:
  105. raise Exception(f"values list is empty")
  106. results.sort(key=lambda x: x.address)
  107. address = results[0].address
  108. for result in results:
  109. if result.value is None:
  110. raise Exception(f"value of {result.name} ({hex(result.address)}) is empty")
  111. if address < result.address:
  112. raise Exception(f"variables are not continuous at {hex(result.address)}")
  113. address = result.address + (2 if result.is_32_bit else 1)
  114. start_variable = results[0]
  115. end_variable = results[-1]
  116. start_address = start_variable.address
  117. end_address = end_variable.address + (1 if end_variable.is_32_bit else 0)
  118. count = end_address - start_address + 1
  119. byte_count = count * 2
  120. if byte_count > 255:
  121. raise Exception(f"address range is too large")
  122. if count > 1:
  123. function_code = FunctionCodes.WRITE_MEMORY_RANGE
  124. header = bytes([
  125. device_id,
  126. function_code.value,
  127. start_address >> 8,
  128. start_address & 0xFF,
  129. count >> 8,
  130. count & 0xFF,
  131. byte_count,
  132. ])
  133. else:
  134. if FunctionCodes.WRITE_STATUS_REGISTER.value in results[0].function_codes:
  135. function_code = FunctionCodes.WRITE_STATUS_REGISTER
  136. else:
  137. function_code = FunctionCodes.WRITE_MEMORY_SINGLE
  138. header = bytes([
  139. device_id,
  140. function_code.value,
  141. start_address >> 8,
  142. start_address & 0xFF,
  143. ])
  144. if not all(function_code.value in x.function_codes for x in results):
  145. raise Exception(f"function code {function_code.name} is not supported for all addresses")
  146. data = bytearray(byte_count)
  147. for result in results:
  148. offset = (result.address - start_address) * 2
  149. self.value_to_bytes(result, data, offset, result.value)
  150. result = header + bytes(data)
  151. return start_address, result + crc16(result)
  152. def is_complete(self, buffer: bytes) -> bool:
  153. if len(buffer) < 4:
  154. return False
  155. device_id = buffer[0]
  156. if not buffer[1] in FunctionCodes._value2member_map_:
  157. return False
  158. function_code = FunctionCodes(buffer[1])
  159. if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
  160. data_length = buffer[2]
  161. return len(buffer) >= data_length + 5
  162. else:
  163. return len(buffer) >= 8
  164. def parse(self, start_address: int, buffer: bytes) -> ResultContainer:
  165. self.device_id = buffer[0]
  166. function_code = FunctionCodes(buffer[1])
  167. results = []
  168. if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
  169. data_length = buffer[2]
  170. received_crc = buffer[3+data_length:3+data_length+2]
  171. calculated_crc = crc16(buffer[:3+data_length])
  172. if received_crc != calculated_crc:
  173. raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
  174. address = start_address
  175. cursor = 3
  176. while cursor < data_length + 3:
  177. items = [v for v in variables if address == v.address and function_code.value in v.function_codes]
  178. for variable in items:
  179. value = self.bytes_to_value(variable, buffer, cursor)
  180. results.append(Result(**vars(variable), value=value))
  181. cursor += 2
  182. address += 1
  183. else:
  184. address = struct.unpack_from('>H', buffer, 2)[0]
  185. if address != start_address:
  186. raise Exception(f"Write result address mismatch ({hex(address)} != {hex(start_address)})")
  187. received_crc = buffer[6:8]
  188. calculated_crc = crc16(buffer[:6])
  189. if received_crc != calculated_crc:
  190. raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
  191. if function_code in [FunctionCodes.WRITE_MEMORY_SINGLE, FunctionCodes.WRITE_STATUS_REGISTER]:
  192. variable = [v for v in variables if address == v.address and function_code.value in v.function_codes][0]
  193. value = self.bytes_to_value(variable, buffer, 4)
  194. results.append(Result(**vars(variable), value=value))
  195. return ResultContainer(results)
  196. def _find_raw_value_by_brute_force(self, variable: Variable, value: str):
  197. if variable.multiplier:
  198. value = float(value) * variable.multiplier
  199. n_bits = 32 if variable.is_32_bit else 16
  200. if variable.is_signed:
  201. for i in range(0, 2**(n_bits-1) + 1):
  202. try:
  203. if variable.func(i) == value:
  204. return i
  205. except IndexError:
  206. pass
  207. for i in range(0, -2**(n_bits-1) - 2, -1):
  208. try:
  209. if variable.func(i) == value:
  210. return i
  211. except IndexError:
  212. pass
  213. else:
  214. for i in range(0, 2**n_bits + 1):
  215. try:
  216. if variable.func(i) == value:
  217. return i
  218. except IndexError:
  219. pass
  220. return None