bleclient.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import asyncio
  2. import struct
  3. from bleak import BleakClient, BleakScanner
  4. from bleak.backends.characteristic import BleakGATTCharacteristic
  5. from crc import crc16
  6. class BleClient:
  7. DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
  8. NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
  9. WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
  10. buffer = bytearray()
  11. def __init__(self, mac_address: str):
  12. self.client = BleakClient(mac_address)
  13. self.details_queue = asyncio.Queue() # Queue to store the received details
  14. async def __aenter__(self):
  15. await self.client.connect() # Connect to the BLE device
  16. await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler) # Start receiving notifications
  17. return self
  18. async def __aexit__(self, exc_type, exc, tb):
  19. await self.client.stop_notify(self.NOTIFY_UUID) # Stop receiving notifications
  20. await self.client.disconnect() # Disconnect from the BLE device
  21. async def write(self, cmd: int):
  22. data = cmd.to_bytes(8, 'big') # Convert the command to a byte array
  23. crc = data[-2:] # Extract the CRC from the data
  24. values = data[:-2] # Extract the values from the data
  25. crc2 = crc16(values) # Calculate the CRC of the values
  26. if crc != crc2:
  27. # If the calculated CRC doesn't match the extracted CRC, replace it with the calculated CRC
  28. data = values + crc2
  29. #print("write ", self.WRITE_UUID, data.hex())
  30. await self.client.write_gatt_char(self.WRITE_UUID, data) # Write the data to the BLE device
  31. async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
  32. if characteristic.uuid != self.NOTIFY_UUID:
  33. return
  34. self.buffer += data # Append the received data to the buffer
  35. if len(self.buffer) < 3:
  36. return
  37. crc = self.buffer[-2:] # Extract the CRC from the buffer
  38. values = data[:-2] # Extract the values from the buffer
  39. crc2 = crc16(values) # Calculate the CRC of the values
  40. if crc != crc2:
  41. # If the calculated CRC doesn't match the extracted CRC, ignore the data
  42. pass
  43. if self.buffer[0] != 0x01:
  44. print("invalid start byte", self.buffer.hex())
  45. self.buffer = bytearray()
  46. return
  47. if len(self.buffer) == 91:
  48. response = BleClient.parse_details_response(self.buffer) # Parse the details response from the buffer
  49. self.details_queue.put_nowait(response) # Add the parsed response to the queue
  50. self.buffer = bytearray()
  51. if len(self.buffer) >= 91:
  52. print(f"received too many bytes ({len(self.buffer)})")
  53. self.buffer = bytearray() # Clear the buffer
  54. async def list_services(self):
  55. for service in self.client.services:
  56. print("[Service] %s", service)
  57. for char in service.characteristics:
  58. print(" [Characteristic] ", char, ",".join(char.properties))
  59. for descriptor in char.descriptors:
  60. try:
  61. value = await self.client.read_gatt_descriptor(descriptor.handle)
  62. print(" [Descriptor] ", descriptor, value)
  63. except Exception as e:
  64. print(" [Descriptor] ", descriptor, e)
  65. async def get_device_name(self):
  66. device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID) # Read the device name from the BLE device
  67. return "".join(map(chr, device_name))
  68. async def request_details(self):
  69. self.details_queue = asyncio.Queue() # Clear the queue
  70. i = 0
  71. while self.details_queue.empty() and i < 10:
  72. i += 1
  73. await self.write(0xFE043030002bbf1a) # Send a request for details to the BLE device
  74. try:
  75. # Wait for either a response or timeout
  76. return await asyncio.wait_for(self.details_queue.get(), timeout=5)
  77. except asyncio.TimeoutError:
  78. pass
  79. return None
  80. @staticmethod
  81. def solar_panel_charge_state(v: int):
  82. if 0:
  83. return "invalid"
  84. elif 1:
  85. return "float_charge"
  86. elif 2:
  87. return "boost_charge"
  88. elif 3:
  89. return "equal_charge"
  90. else:
  91. return "fault"
  92. @staticmethod
  93. def load_discharge_state(v: int):
  94. fault = v & 2 == 1
  95. if not fault and v & 1:
  96. return "enabled"
  97. elif not fault:
  98. return "disabled"
  99. elif (v >> 2) & 1:
  100. return "over_temperature"
  101. elif (v >> 3) & 1:
  102. return "open_circuit_protection"
  103. elif (v >> 4) & 1:
  104. return "hardware_protection"
  105. elif (v >> 11) & 1:
  106. return "short_circuit_protection"
  107. else:
  108. return str((v >> 12) & 0b11)
  109. @staticmethod
  110. def parse_details_response(data):
  111. if len(data) != 91:
  112. return None
  113. subdata = data[3:-2]
  114. return {
  115. "equipment_id": struct.unpack_from(">H", subdata, 0)[0],
  116. "run_days": struct.unpack_from(">H", subdata, 2)[0],
  117. "battery_full_level": struct.unpack_from(">H", subdata, 4)[0] / 100,
  118. "battery_state_1": struct.unpack_from(">H", subdata, 6)[0] & 0xF0 >> 4,
  119. "battery_state_2": struct.unpack_from(">H", subdata, 6)[0] & 0x0F,
  120. "solar_panel_is_charging": struct.unpack_from(">H", subdata, 8)[0] & 1 > 0,
  121. "solar_panel_is_night": struct.unpack_from(">H", subdata, 8)[0] & 32 > 0,
  122. "solar_panel_charge_state": BleClient.solar_panel_charge_state(struct.unpack_from(">H", subdata, 8)[0] & 0b1100 >> 2),
  123. "solar_panel_state": struct.unpack_from(">H", subdata, 8)[0] & 16 > 0,
  124. "load_is_enabled": struct.unpack_from(">H", subdata, 10)[0] & 1 > 0,
  125. "load_state": BleClient.load_discharge_state(struct.unpack_from(">H", subdata, 10)[0]),
  126. "temperature_1": struct.unpack_from(">H", subdata, 12)[0] / 100,
  127. "temperature_2": struct.unpack_from(">H", subdata, 14)[0] / 100,
  128. "battery_empty_times": struct.unpack_from(">H", subdata, 16)[0],
  129. "battery_full_times": struct.unpack_from(">H", subdata, 18)[0],
  130. "battery_percentage": struct.unpack_from(">H", subdata, 42)[0],
  131. "battery_voltage": struct.unpack_from(">H", subdata, 44)[0] / 100,
  132. "battery_current": struct.unpack_from(">h", subdata, 46)[0] / 100,
  133. "battery_power": (struct.unpack_from(">H", subdata, 48)[0] + struct.unpack_from(">h", subdata, 50)[0] * 0x100) / 100,
  134. "load_voltage": struct.unpack_from(">H", subdata, 52)[0] / 100,
  135. "load_current": struct.unpack_from(">H", subdata, 54)[0] / 100,
  136. "load_power": (struct.unpack_from(">H", subdata, 56)[0] | struct.unpack_from(">H", subdata, 58)[0] << 16) / 100,
  137. "solar_panel_voltage": struct.unpack_from(">H", subdata, 60)[0] / 100,
  138. "solar_panel_current": struct.unpack_from(">H", subdata, 62)[0] / 100,
  139. "solar_panel_power": (struct.unpack_from(">H", subdata, 64)[0] | struct.unpack_from(">H", subdata, 66)[0] << 16) / 100,
  140. "solar_panel_daily_energy": struct.unpack_from(">H", subdata, 68)[0] / 100,
  141. "solar_panel_total_energy": (struct.unpack_from(">H", subdata, 70)[0] | struct.unpack_from(">H", subdata, 72)[0] << 16) / 100,
  142. "load_daily_energy": struct.unpack_from(">H", subdata, 74)[0] / 100,
  143. "load_total_energy": struct.unpack_from(">I", subdata, 78)[0] / 100,
  144. }
  145. # Map the keys to their respective units of measurement
  146. @staticmethod
  147. def get_unit_of_measurement(key):
  148. unit_mapping = {
  149. "solar_panel_is_charging": None,
  150. "solar_panel_is_night": None,
  151. "solar_panel_charge_state": None,
  152. "solar_panel_state": None,
  153. "load_is_enabled": None,
  154. "load_state": None,
  155. "temperature_1": "°C",
  156. "temperature_2": "°C",
  157. "battery_percentage": "%",
  158. "battery_voltage": "V",
  159. "battery_current": "A",
  160. "battery_power": "W",
  161. "load_voltage": "V",
  162. "load_current": "A",
  163. "load_power": "W",
  164. "solar_panel_voltage": "V",
  165. "solar_panel_current": "A",
  166. "solar_panel_power": "W",
  167. "solar_panel_daily_energy": "kWh",
  168. "solar_panel_total_energy": "kWh",
  169. "load_daily_energy": "kWh",
  170. "load_total_energy": "kWh",
  171. }
  172. return unit_mapping.get(key, None)