protocol.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import struct
  2. from dataclasses import dataclass
  3. from typing import Any, List, Union, Tuple, Optional
  4. from .variables import variables, Variable, FunctionCodes
  5. from .crc import crc16
  6. type Value = str|int|float
  7. @dataclass
  8. class Result(Variable):
  9. value: Value
  10. class ResultContainer:
  11. def __init__(self, results: List[Result]):
  12. self._results = results
  13. self._result_map = {res.name: res for res in results}
  14. def __getitem__(self, key: Union[int, str]) -> Result:
  15. if isinstance(key, int):
  16. return self._results[key]
  17. elif isinstance(key, str):
  18. return self._result_map[key]
  19. else:
  20. raise TypeError("Key must be an integer index or a result name string.")
  21. def __len__(self):
  22. return len(self._results)
  23. def __iter__(self):
  24. return iter(self._results)
  25. def __bool__(self):
  26. return len(self._results) > 0
  27. def items(self):
  28. return self._result_map.items()
  29. def get(self, key: str) -> Optional[Result]:
  30. if isinstance(key, str):
  31. return self._result_map.get(key)
  32. else:
  33. raise TypeError("Key must be a variable name string.")
  34. class LumiaxClient:
  35. def __init__(self):
  36. self.device_id = 0xFE
  37. def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int) -> Value:
  38. if variable.is_32_bit and variable.is_signed:
  39. raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">h", buffer, offset + 2)[0] << 16
  40. elif variable.is_32_bit:
  41. raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">H", buffer, offset + 2)[0] << 16
  42. elif variable.is_signed:
  43. raw_value = struct.unpack_from(">h", buffer, offset)[0]
  44. else:
  45. raw_value = struct.unpack_from(">H", buffer, offset)[0]
  46. if variable.multiplier:
  47. value = raw_value / variable.multiplier
  48. elif variable.func:
  49. try:
  50. value = variable.func(raw_value)
  51. except IndexError as e:
  52. raise Exception(f"unexpected value for {variable.name} ({hex(variable.address)}): '{raw_value}'")
  53. else:
  54. value = raw_value
  55. return value
  56. def value_to_bytes(self, variable: Variable, buffer: bytearray, offset: int, value: Value) -> int:
  57. if variable.multiplier and not variable.func:
  58. raw_value = round(float(value) * variable.multiplier)
  59. elif variable.func:
  60. raw_value = self._find_raw_value_by_brute_force(variable, value)
  61. if raw_value == None:
  62. raise Exception(f"invalid value for {variable.name}: '{value}'")
  63. elif variable.binary_payload and value == variable.binary_payload[0]:
  64. raw_value = 1
  65. elif variable.binary_payload and value == variable.binary_payload[1]:
  66. raw_value = 0
  67. elif variable.binary_payload:
  68. raise Exception(f"invalid binary value for {variable.name}: '{value}'")
  69. else:
  70. raw_value = int(value)
  71. if variable.is_32_bit and variable.is_signed:
  72. struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
  73. struct.pack_into(">h", buffer, offset + 2, raw_value >> 16)
  74. elif variable.is_32_bit:
  75. struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
  76. struct.pack_into(">H", buffer, offset + 2, raw_value >> 16)
  77. elif variable.is_signed:
  78. struct.pack_into(">h", buffer, offset, raw_value)
  79. else:
  80. struct.pack_into(">H", buffer, offset, raw_value)
  81. length = 4 if variable.is_32_bit else 2
  82. return offset + length
  83. def get_read_command(self, device_id: int, start_address: int, count: int) -> bytes:
  84. items = [v for v in variables if v.address >= start_address and v.address < start_address + count]
  85. if not items:
  86. raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} contains no variables")
  87. function_code = items[0].function_codes[0]
  88. if not all(function_code in v.function_codes for v in items):
  89. raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} spans multiple function codes")
  90. result = bytes([
  91. device_id,
  92. function_code,
  93. start_address >> 8,
  94. start_address & 0xFF,
  95. count >> 8,
  96. count & 0xFF
  97. ])
  98. return result + crc16(result)
  99. def get_write_command(self, device_id: int, results: list[Result]) -> Tuple[int, bytes]:
  100. if not results:
  101. raise Exception(f"values list is empty")
  102. results.sort(key=lambda x: x.address)
  103. address = results[0].address
  104. for result in results:
  105. if result.value is None:
  106. raise Exception(f"value of {result.name} ({hex(result.address)}) is empty")
  107. if address < result.address:
  108. raise Exception(f"variables are not continuous at {hex(result.address)}")
  109. address = result.address + (2 if result.is_32_bit else 1)
  110. start_variable = results[0]
  111. end_variable = results[-1]
  112. start_address = start_variable.address
  113. end_address = end_variable.address + (1 if end_variable.is_32_bit else 0)
  114. count = end_address - start_address + 1
  115. byte_count = count * 2
  116. if byte_count > 255:
  117. raise Exception(f"address range is too large")
  118. if count > 1:
  119. function_code = FunctionCodes.WRITE_MEMORY_RANGE
  120. header = bytes([
  121. device_id,
  122. function_code.value,
  123. start_address >> 8,
  124. start_address & 0xFF,
  125. count >> 8,
  126. count & 0xFF,
  127. byte_count,
  128. ])
  129. else:
  130. if FunctionCodes.WRITE_STATUS_REGISTER.value in results[0].function_codes:
  131. function_code = FunctionCodes.WRITE_STATUS_REGISTER
  132. else:
  133. function_code = FunctionCodes.WRITE_MEMORY_SINGLE
  134. header = bytes([
  135. device_id,
  136. function_code.value,
  137. start_address >> 8,
  138. start_address & 0xFF,
  139. ])
  140. if not all(function_code.value in x.function_codes for x in results):
  141. raise Exception(f"function code {function_code.name} is not supported for all addresses")
  142. data = bytearray(byte_count)
  143. for result in results:
  144. offset = (result.address - start_address) * 2
  145. self.value_to_bytes(result, data, offset, result.value)
  146. result = header + bytes(data)
  147. return start_address, result + crc16(result)
  148. def is_complete(self, buffer: bytes) -> bool:
  149. if len(buffer) < 4:
  150. return False
  151. device_id = buffer[0]
  152. function_code = FunctionCodes(buffer[1])
  153. if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
  154. data_length = buffer[2]
  155. return len(buffer) >= data_length + 5
  156. else:
  157. return len(buffer) >= 8
  158. def parse(self, start_address: int, buffer: bytes) -> ResultContainer:
  159. self.device_id = buffer[0]
  160. function_code = FunctionCodes(buffer[1])
  161. results = []
  162. if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
  163. data_length = buffer[2]
  164. received_crc = buffer[3+data_length:3+data_length+2]
  165. calculated_crc = crc16(buffer[:3+data_length])
  166. if received_crc != calculated_crc:
  167. raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
  168. address = start_address
  169. cursor = 3
  170. while cursor < data_length + 3:
  171. items = [v for v in variables if address == v.address and function_code.value in v.function_codes]
  172. for variable in items:
  173. value = self.bytes_to_value(variable, buffer, cursor)
  174. results.append(Result(**vars(variable), value=value))
  175. cursor += 2
  176. address += 1
  177. else:
  178. address = struct.unpack_from('>H', buffer, 2)[0]
  179. if address != start_address:
  180. raise Exception(f"Write result address mismatch ({hex(address)} != {hex(start_address)})")
  181. received_crc = buffer[6:8]
  182. calculated_crc = crc16(buffer[:6])
  183. if received_crc != calculated_crc:
  184. raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
  185. if function_code in [FunctionCodes.WRITE_MEMORY_SINGLE, FunctionCodes.WRITE_STATUS_REGISTER]:
  186. variable = [v for v in variables if address == v.address and function_code.value in v.function_codes][0]
  187. value = self.bytes_to_value(variable, buffer, 4)
  188. results.append(Result(**vars(variable), value=value))
  189. return ResultContainer(results)
  190. def _find_raw_value_by_brute_force(self, variable: Variable, value: str):
  191. if variable.multiplier:
  192. value = float(value) * variable.multiplier
  193. n_bits = 32 if variable.is_32_bit else 16
  194. if variable.is_signed:
  195. for i in range(0, 2**(n_bits-1) + 1):
  196. try:
  197. if variable.func(i) == value:
  198. return i
  199. except IndexError:
  200. pass
  201. for i in range(0, -2**(n_bits-1) - 2, -1):
  202. try:
  203. if variable.func(i) == value:
  204. return i
  205. except IndexError:
  206. pass
  207. else:
  208. for i in range(0, 2**n_bits + 1):
  209. try:
  210. if variable.func(i) == value:
  211. return i
  212. except IndexError:
  213. pass
  214. return None