Bladeren bron

use lumiax client

subDesTagesMitExtraKaese 1 maand geleden
bovenliggende
commit
a1b1a1e798
2 gewijzigde bestanden met toevoegingen van 38 en 153 verwijderingen
  1. 6 6
      main.py
  2. 32 147
      src/bleclient.py

+ 6 - 6
main.py

@@ -9,12 +9,12 @@ import aiomqtt
 from bleak import BleakScanner
 from bleak.exc import BleakError, BleakDeviceNotFoundError
 
-from src.bleclient import BleClient
+from src.bleclient import BleClient, Result
 
 send_config = True
 reconnect_interval = 5  # In seconds
 
-async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
+async def mqtt_publish(details: dict[str, Result], client: aiomqtt.Client):
     global send_config
     # Define the base topic for MQTT Discovery
     base_topic = "homeassistant"
@@ -33,11 +33,11 @@ async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
 
         # Create the MQTT Discovery payload
         payload = {
-            "name": f"Solarlife {key.replace('_', ' ').title()}",
+            "name": f"Solarlife {value.friendly_name}",
             "device": device_info,
             "unique_id": f"solarlife_{key}",
             "state_topic": state_topic,
-            "unit_of_measurement": BleClient.get_unit_of_measurement(key)
+            "unit_of_measurement": value.unit,
         }
         if "daily_energy" in key:
             payload['device_class'] = "energy"
@@ -67,7 +67,7 @@ async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
             await client.publish(topic, payload=json.dumps(payload), retain=True)
 
         # Publish the entity state
-        await client.publish(state_topic, payload=str(value))
+        await client.publish(state_topic, payload=str(value.value))
     send_config = False
     
 async def main(address, host, port, username, password):
@@ -82,7 +82,7 @@ async def main(address, host, port, username, password):
                                 while True:
                                     details = await mppt.request_details()
                                     if details:
-                                        print(f"Battery: {details['battery_percentage']}% ({details['battery_voltage']}V)")
+                                        print(f"Battery: {details['battery_percentage'].value}% ({details['battery_voltage'].value}V)")
                                         await mqtt_publish(details, client)
                                     else:
                                         print("No values recieved")

+ 32 - 147
src/bleclient.py

@@ -3,9 +3,10 @@ import struct
 from bleak import BleakClient, BleakScanner
 from bleak.backends.characteristic import BleakGATTCharacteristic
 
-from crc import crc16
+from src.crc import crc16
+from src.protocol import LumiaxClient, Result
 
-class BleClient:
+class BleClient(LumiaxClient):
     DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
     NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
     WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
@@ -14,7 +15,8 @@ class BleClient:
 
     def __init__(self, mac_address: str):
         self.client = BleakClient(mac_address)
-        self.details_queue = asyncio.Queue()  # Queue to store the received details
+        self.response_queue = asyncio.Queue()
+        self.lock = asyncio.Lock()
 
     async def __aenter__(self):
         await self.client.connect()  # Connect to the BLE device
@@ -25,41 +27,38 @@ class BleClient:
         await self.client.stop_notify(self.NOTIFY_UUID)  # Stop receiving notifications
         await self.client.disconnect()  # Disconnect from the BLE device
 
-    async def write(self, cmd: int):
-        data = cmd.to_bytes(8, 'big')  # Convert the command to a byte array
-        crc = data[-2:]  # Extract the CRC from the data
-        values = data[:-2]  # Extract the values from the data
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, replace it with the calculated CRC
-            data = values + crc2
-        #print("write ", self.WRITE_UUID, data.hex())
-        await self.client.write_gatt_char(self.WRITE_UUID, data)  # Write the data to the BLE device
-
     async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
         if characteristic.uuid != self.NOTIFY_UUID:
             return
         self.buffer += data  # Append the received data to the buffer
-        if len(self.buffer) < 3:
+        if not self.is_complete(self.buffer):
             return
+        results = self.parse(self.start_address, buffer)
+        self.response_queue.put_nowait({r.name: r for r in results})
 
-        crc = self.buffer[-2:]  # Extract the CRC from the buffer
-        values = data[:-2]  # Extract the values from the buffer
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, ignore the data
-            pass
-        if self.buffer[0] != 0x01:
-            print("invalid start byte", self.buffer.hex())
-            self.buffer = bytearray()
-            return
-        if len(self.buffer) == 91:
-            response = BleClient.parse_details_response(self.buffer)  # Parse the details response from the buffer
-            self.details_queue.put_nowait(response)  # Add the parsed response to the queue
-            self.buffer = bytearray()
-        if len(self.buffer) >= 91:
-            print(f"received too many bytes ({len(self.buffer)})")
-            self.buffer = bytearray()  # Clear the buffer
+    async def read(self, start_address: int, count: int, repeat = 10, timeout = 5) -> dict[str, Result]:
+        async with self.lock:
+            self.start_address = start_address
+            command = self.get_read_command(0xFE, start_address, count)
+            self.response_queue = asyncio.Queue() # Clear the queue
+            i = 0
+            # send the command multiple times
+            while self.response_queue.empty() and i < repeat:
+                i += 1
+                await self.client.write_gatt_char(self.WRITE_UUID, command)
+                try:
+                    # Wait for either a response or timeout
+                    return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
+                except asyncio.TimeoutError:
+                    pass
+            return None
+
+    async def request_details(self) -> dict[str, Result]:
+        return await self.read(0x3030, 43)
+
+    async def get_device_name(self):
+        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
+        return "".join(map(chr, device_name))
 
     async def list_services(self):
         for service in self.client.services:
@@ -71,118 +70,4 @@ class BleClient:
                         value = await self.client.read_gatt_descriptor(descriptor.handle)
                         print("    [Descriptor] ", descriptor, value)
                     except Exception as e:
-                        print("    [Descriptor] ", descriptor, e)
-
-    async def get_device_name(self):
-        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
-        return "".join(map(chr, device_name))
-
-    async def request_details(self):
-        self.details_queue = asyncio.Queue()  # Clear the queue
-        i = 0
-        while self.details_queue.empty() and i < 10:
-            i += 1
-            await self.write(0xFE043030002bbf1a)  # Send a request for details to the BLE device
-            try:
-                # Wait for either a response or timeout
-                return await asyncio.wait_for(self.details_queue.get(), timeout=5)
-            except asyncio.TimeoutError:
-                pass
-        return None
-
-    @staticmethod
-    def solar_panel_charge_state(v: int):
-        if 0:
-            return "invalid"
-        elif 1:
-            return "float_charge"
-        elif 2: 
-            return "boost_charge"
-        elif 3:
-            return "equal_charge"
-        else:
-            return "fault"
-        
-    @staticmethod
-    def load_discharge_state(v: int):
-        fault = v & 2 == 1
-        if not fault and v & 1:
-            return "enabled"
-        elif not fault:
-            return "disabled"
-        elif (v >> 2) & 1:
-            return  "over_temperature"
-        elif (v >> 3) & 1:
-            return "open_circuit_protection"
-        elif (v >> 4) & 1:
-            return "hardware_protection"
-        elif (v >> 11) & 1:
-            return "short_circuit_protection"
-        else:
-            return str((v >> 12) & 0b11)
-
-    @staticmethod
-    def parse_details_response(data):
-        if len(data) != 91:
-            return None
-        subdata = data[3:-2]
-        return {
-            "equipment_id": struct.unpack_from(">H", subdata, 0)[0],
-            "run_days": struct.unpack_from(">H", subdata, 2)[0],
-            "battery_full_level": struct.unpack_from(">H", subdata, 4)[0] / 100,
-            "battery_state_1": struct.unpack_from(">H", subdata, 6)[0] & 0xF0 >> 4,
-            "battery_state_2": struct.unpack_from(">H", subdata, 6)[0] & 0x0F,
-            "solar_panel_is_charging": struct.unpack_from(">H", subdata, 8)[0] & 1 > 0,
-            "solar_panel_is_night": struct.unpack_from(">H", subdata, 8)[0] & 32 > 0,
-            "solar_panel_charge_state": BleClient.solar_panel_charge_state(struct.unpack_from(">H", subdata, 8)[0] & 0b1100 >> 2),
-            "solar_panel_state": struct.unpack_from(">H", subdata, 8)[0] & 16 > 0,
-            "load_is_enabled": struct.unpack_from(">H", subdata, 10)[0] & 1 > 0,
-            "load_state": BleClient.load_discharge_state(struct.unpack_from(">H", subdata, 10)[0]),
-            "temperature_1": struct.unpack_from(">H", subdata, 12)[0] / 100,
-            "temperature_2": struct.unpack_from(">H", subdata, 14)[0] / 100,
-            "battery_empty_times": struct.unpack_from(">H", subdata, 16)[0],
-            "battery_full_times": struct.unpack_from(">H", subdata, 18)[0],
-            "battery_percentage": struct.unpack_from(">H", subdata, 42)[0],
-            "battery_voltage": struct.unpack_from(">H", subdata, 44)[0] / 100,
-            "battery_current": struct.unpack_from(">h", subdata, 46)[0] / 100,
-            "battery_power": (struct.unpack_from(">H", subdata, 48)[0] | struct.unpack_from(">h", subdata, 50)[0] << 16) / 100,
-            "load_voltage": struct.unpack_from(">H", subdata, 52)[0] / 100,
-            "load_current": struct.unpack_from(">H", subdata, 54)[0] / 100,
-            "load_power": (struct.unpack_from(">H", subdata, 56)[0] | struct.unpack_from(">H", subdata, 58)[0] << 16) / 100,
-            "solar_panel_voltage": struct.unpack_from(">H", subdata, 60)[0] / 100,
-            "solar_panel_current": struct.unpack_from(">H", subdata, 62)[0] / 100,
-            "solar_panel_power": (struct.unpack_from(">H", subdata, 64)[0] | struct.unpack_from(">H", subdata, 66)[0] << 16) / 100,
-            "solar_panel_daily_energy": struct.unpack_from(">H", subdata, 68)[0] / 100,
-            "solar_panel_total_energy": (struct.unpack_from(">H", subdata, 70)[0] | struct.unpack_from(">H", subdata, 72)[0] << 16) / 100,
-            "load_daily_energy": struct.unpack_from(">H", subdata, 74)[0] / 100,
-            "load_total_energy": (struct.unpack_from(">H", subdata, 76)[0] | struct.unpack_from(">H", subdata, 78)[0] << 16 ) / 100,
-        }
-
-    # Map the keys to their respective units of measurement
-    @staticmethod
-    def get_unit_of_measurement(key):
-        unit_mapping = {
-            "solar_panel_is_charging": None,
-            "solar_panel_is_night": None,
-            "solar_panel_charge_state": None,
-            "solar_panel_state": None,
-            "load_is_enabled": None,
-            "load_state": None,
-            "temperature_1": "°C",
-            "temperature_2": "°C",
-            "battery_percentage": "%",
-            "battery_voltage": "V",
-            "battery_current": "A",
-            "battery_power": "W",
-            "load_voltage": "V",
-            "load_current": "A",
-            "load_power": "W",
-            "solar_panel_voltage": "V",
-            "solar_panel_current": "A",
-            "solar_panel_power": "W",
-            "solar_panel_daily_energy": "kWh",
-            "solar_panel_total_energy": "kWh",
-            "load_daily_energy": "kWh",
-            "load_total_energy": "kWh",
-        }
-        return unit_mapping.get(key, None)
+                        print("    [Descriptor] ", descriptor, e)