bleclient.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import asyncio
  2. import struct
  3. from bleak import BleakClient, BleakScanner
  4. from bleak.backends.characteristic import BleakGATTCharacteristic
  5. from crc import crc16
  6. class BleClient:
  7. DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
  8. NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
  9. WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
  10. buffer = bytearray()
  11. def __init__(self, mac_address: str):
  12. self.client = BleakClient(mac_address)
  13. self.details_queue = asyncio.Queue() # Queue to store the received details
  14. async def __aenter__(self):
  15. await self.client.connect() # Connect to the BLE device
  16. await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler) # Start receiving notifications
  17. return self
  18. async def __aexit__(self, exc_type, exc, tb):
  19. await self.client.stop_notify(self.NOTIFY_UUID) # Stop receiving notifications
  20. await self.client.disconnect() # Disconnect from the BLE device
  21. async def write(self, cmd: int):
  22. data = cmd.to_bytes(8, 'big') # Convert the command to a byte array
  23. crc = data[-2:] # Extract the CRC from the data
  24. values = data[:-2] # Extract the values from the data
  25. crc2 = crc16(values) # Calculate the CRC of the values
  26. if crc != crc2:
  27. # If the calculated CRC doesn't match the extracted CRC, replace it with the calculated CRC
  28. data = values + crc2
  29. #print("write ", self.WRITE_UUID, data.hex())
  30. await self.client.write_gatt_char(self.WRITE_UUID, data) # Write the data to the BLE device
  31. async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
  32. if characteristic.uuid != self.NOTIFY_UUID:
  33. return
  34. self.buffer += data # Append the received data to the buffer
  35. if len(self.buffer) < 3:
  36. return
  37. crc = self.buffer[-2:] # Extract the CRC from the buffer
  38. values = data[:-2] # Extract the values from the buffer
  39. crc2 = crc16(values) # Calculate the CRC of the values
  40. if crc != crc2:
  41. # If the calculated CRC doesn't match the extracted CRC, ignore the data
  42. pass
  43. if self.buffer[0] != 0x01:
  44. print("invalid start byte", self.buffer.hex())
  45. self.buffer = bytearray()
  46. return
  47. if len(self.buffer) == 91:
  48. response = BleClient.parse_details_response(self.buffer) # Parse the details response from the buffer
  49. self.details_queue.put_nowait(response) # Add the parsed response to the queue
  50. self.buffer = bytearray()
  51. if len(self.buffer) >= 91:
  52. print(f"received too many bytes ({len(self.buffer)})")
  53. self.buffer = bytearray() # Clear the buffer
  54. async def list_services(self):
  55. for service in self.client.services:
  56. print("[Service] %s", service)
  57. for char in service.characteristics:
  58. print(" [Characteristic] ", char, ",".join(char.properties))
  59. for descriptor in char.descriptors:
  60. try:
  61. value = await self.client.read_gatt_descriptor(descriptor.handle)
  62. print(" [Descriptor] ", descriptor, value)
  63. except Exception as e:
  64. print(" [Descriptor] ", descriptor, e)
  65. async def get_device_name(self):
  66. device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID) # Read the device name from the BLE device
  67. return "".join(map(chr, device_name))
  68. async def request_details(self):
  69. self.details_queue = asyncio.Queue() # Clear the queue
  70. i = 0
  71. while self.details_queue.empty() and i < 10:
  72. i += 1
  73. await self.write(0xFE043030002bbf1a) # Send a request for details to the BLE device
  74. await asyncio.sleep(0.1) # Wait for the response to be received
  75. return await self.details_queue.get() # Return the first item in the queue
  76. @staticmethod
  77. def solar_panel_charge_state(v: int):
  78. if 0:
  79. return "invalid"
  80. elif 1:
  81. return "float_charge"
  82. elif 2:
  83. return "boost_charge"
  84. elif 3:
  85. return "equal_charge"
  86. else:
  87. return "fault"
  88. @staticmethod
  89. def load_discharge_state(v: int):
  90. fault = v & 2 == 1
  91. if not fault and v & 1:
  92. return "enabled"
  93. elif not fault:
  94. return "disabled"
  95. elif (v >> 2) & 1:
  96. return "over_temperature"
  97. elif (v >> 3) & 1:
  98. return "open_circuit_protection"
  99. elif (v >> 4) & 1:
  100. return "hardware_protection"
  101. elif (v >> 11) & 1:
  102. return "short_circuit_protection"
  103. else:
  104. return str((v >> 12) & 0b11)
  105. @staticmethod
  106. def parse_details_response(data):
  107. if len(data) != 91:
  108. return None
  109. subdata = data[3:-2]
  110. return {
  111. "equipment_id": struct.unpack_from(">H", subdata, 0)[0],
  112. "run_days": struct.unpack_from(">H", subdata, 2)[0],
  113. "battery_full_level": struct.unpack_from(">H", subdata, 4)[0] / 100,
  114. "battery_state_1": struct.unpack_from(">H", subdata, 6)[0] & 0xF0 >> 4,
  115. "battery_state_2": struct.unpack_from(">H", subdata, 6)[0] & 0x0F,
  116. "solar_panel_is_charging": struct.unpack_from(">H", subdata, 8)[0] & 1 > 0,
  117. "solar_panel_is_night": struct.unpack_from(">H", subdata, 8)[0] & 32 > 0,
  118. "solar_panel_charge_state": BleClient.solar_panel_charge_state(struct.unpack_from(">H", subdata, 8)[0] & 0b1100 >> 2),
  119. "solar_panel_state": struct.unpack_from(">H", subdata, 8)[0] & 16 > 0,
  120. "load_is_enabled": struct.unpack_from(">H", subdata, 10)[0] & 1 > 0,
  121. "load_state": BleClient.load_discharge_state(struct.unpack_from(">H", subdata, 10)[0]),
  122. "temperature_1": struct.unpack_from(">H", subdata, 12)[0] / 100,
  123. "temperature_2": struct.unpack_from(">H", subdata, 14)[0] / 100,
  124. "battery_empty_times": struct.unpack_from(">H", subdata, 16)[0],
  125. "battery_full_times": struct.unpack_from(">H", subdata, 18)[0],
  126. "battery_percentage": struct.unpack_from(">H", subdata, 42)[0],
  127. "battery_voltage": struct.unpack_from(">H", subdata, 44)[0] / 100,
  128. "battery_current": struct.unpack_from(">h", subdata, 46)[0] / 100,
  129. "battery_power": (struct.unpack_from(">H", subdata, 48)[0] + struct.unpack_from(">h", subdata, 50)[0] * 0x100) / 100,
  130. "load_voltage": struct.unpack_from(">H", subdata, 52)[0] / 100,
  131. "load_current": struct.unpack_from(">H", subdata, 54)[0] / 100,
  132. "load_power": (struct.unpack_from(">H", subdata, 56)[0] | struct.unpack_from(">H", subdata, 58)[0] << 16) / 100,
  133. "solar_panel_voltage": struct.unpack_from(">H", subdata, 60)[0] / 100,
  134. "solar_panel_current": struct.unpack_from(">H", subdata, 62)[0] / 100,
  135. "solar_panel_power": (struct.unpack_from(">H", subdata, 64)[0] | struct.unpack_from(">H", subdata, 66)[0] << 16) / 100,
  136. "solar_panel_daily_energy": struct.unpack_from(">H", subdata, 68)[0] / 100,
  137. "solar_panel_total_energy": (struct.unpack_from(">H", subdata, 70)[0] | struct.unpack_from(">H", subdata, 72)[0] << 16) / 100,
  138. "load_daily_energy": struct.unpack_from(">H", subdata, 74)[0] / 100,
  139. "load_total_energy": struct.unpack_from(">I", subdata, 78)[0] / 100,
  140. }
  141. # Map the keys to their respective units of measurement
  142. @staticmethod
  143. def get_unit_of_measurement(key):
  144. unit_mapping = {
  145. "solar_panel_is_charging": None,
  146. "solar_panel_is_night": None,
  147. "solar_panel_charge_state": None,
  148. "solar_panel_state": None,
  149. "load_is_enabled": None,
  150. "load_state": None,
  151. "temperature_1": "°C",
  152. "temperature_2": "°C",
  153. "battery_percentage": "%",
  154. "battery_voltage": "V",
  155. "battery_current": "A",
  156. "battery_power": "W",
  157. "load_voltage": "V",
  158. "load_current": "A",
  159. "load_power": "W",
  160. "solar_panel_voltage": "V",
  161. "solar_panel_current": "A",
  162. "solar_panel_power": "W",
  163. "solar_panel_daily_energy": "kWh",
  164. "solar_panel_total_energy": "kWh",
  165. "load_daily_energy": "kWh",
  166. "load_total_energy": "kWh",
  167. }
  168. return unit_mapping.get(key, None)