3 Commits 1f14ed8d0d ... a1b1a1e798

Author SHA1 Message Date
  subDesTagesMitExtraKaese a1b1a1e798 use lumiax client 2 months ago
  subDesTagesMitExtraKaese 30c5eb0ed5 create a result type 2 months ago
  subDesTagesMitExtraKaese 339592f7fd add method to check if a response is complete 2 months ago
4 changed files with 79 additions and 180 deletions
  1. 6 6
      main.py
  2. 32 147
      src/bleclient.py
  3. 19 5
      src/protocol.py
  4. 22 22
      tests/transaction_test.py

+ 6 - 6
main.py

@@ -9,12 +9,12 @@ import aiomqtt
 from bleak import BleakScanner
 from bleak.exc import BleakError, BleakDeviceNotFoundError
 
-from src.bleclient import BleClient
+from src.bleclient import BleClient, Result
 
 send_config = True
 reconnect_interval = 5  # In seconds
 
-async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
+async def mqtt_publish(details: dict[str, Result], client: aiomqtt.Client):
     global send_config
     # Define the base topic for MQTT Discovery
     base_topic = "homeassistant"
@@ -33,11 +33,11 @@ async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
 
         # Create the MQTT Discovery payload
         payload = {
-            "name": f"Solarlife {key.replace('_', ' ').title()}",
+            "name": f"Solarlife {value.friendly_name}",
             "device": device_info,
             "unique_id": f"solarlife_{key}",
             "state_topic": state_topic,
-            "unit_of_measurement": BleClient.get_unit_of_measurement(key)
+            "unit_of_measurement": value.unit,
         }
         if "daily_energy" in key:
             payload['device_class'] = "energy"
@@ -67,7 +67,7 @@ async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
             await client.publish(topic, payload=json.dumps(payload), retain=True)
 
         # Publish the entity state
-        await client.publish(state_topic, payload=str(value))
+        await client.publish(state_topic, payload=str(value.value))
     send_config = False
     
 async def main(address, host, port, username, password):
@@ -82,7 +82,7 @@ async def main(address, host, port, username, password):
                                 while True:
                                     details = await mppt.request_details()
                                     if details:
-                                        print(f"Battery: {details['battery_percentage']}% ({details['battery_voltage']}V)")
+                                        print(f"Battery: {details['battery_percentage'].value}% ({details['battery_voltage'].value}V)")
                                         await mqtt_publish(details, client)
                                     else:
                                         print("No values recieved")

+ 32 - 147
src/bleclient.py

@@ -3,9 +3,10 @@ import struct
 from bleak import BleakClient, BleakScanner
 from bleak.backends.characteristic import BleakGATTCharacteristic
 
-from crc import crc16
+from src.crc import crc16
+from src.protocol import LumiaxClient, Result
 
-class BleClient:
+class BleClient(LumiaxClient):
     DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
     NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
     WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
@@ -14,7 +15,8 @@ class BleClient:
 
     def __init__(self, mac_address: str):
         self.client = BleakClient(mac_address)
-        self.details_queue = asyncio.Queue()  # Queue to store the received details
+        self.response_queue = asyncio.Queue()
+        self.lock = asyncio.Lock()
 
     async def __aenter__(self):
         await self.client.connect()  # Connect to the BLE device
@@ -25,41 +27,38 @@ class BleClient:
         await self.client.stop_notify(self.NOTIFY_UUID)  # Stop receiving notifications
         await self.client.disconnect()  # Disconnect from the BLE device
 
-    async def write(self, cmd: int):
-        data = cmd.to_bytes(8, 'big')  # Convert the command to a byte array
-        crc = data[-2:]  # Extract the CRC from the data
-        values = data[:-2]  # Extract the values from the data
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, replace it with the calculated CRC
-            data = values + crc2
-        #print("write ", self.WRITE_UUID, data.hex())
-        await self.client.write_gatt_char(self.WRITE_UUID, data)  # Write the data to the BLE device
-
     async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
         if characteristic.uuid != self.NOTIFY_UUID:
             return
         self.buffer += data  # Append the received data to the buffer
-        if len(self.buffer) < 3:
+        if not self.is_complete(self.buffer):
             return
+        results = self.parse(self.start_address, buffer)
+        self.response_queue.put_nowait({r.name: r for r in results})
 
-        crc = self.buffer[-2:]  # Extract the CRC from the buffer
-        values = data[:-2]  # Extract the values from the buffer
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, ignore the data
-            pass
-        if self.buffer[0] != 0x01:
-            print("invalid start byte", self.buffer.hex())
-            self.buffer = bytearray()
-            return
-        if len(self.buffer) == 91:
-            response = BleClient.parse_details_response(self.buffer)  # Parse the details response from the buffer
-            self.details_queue.put_nowait(response)  # Add the parsed response to the queue
-            self.buffer = bytearray()
-        if len(self.buffer) >= 91:
-            print(f"received too many bytes ({len(self.buffer)})")
-            self.buffer = bytearray()  # Clear the buffer
+    async def read(self, start_address: int, count: int, repeat = 10, timeout = 5) -> dict[str, Result]:
+        async with self.lock:
+            self.start_address = start_address
+            command = self.get_read_command(0xFE, start_address, count)
+            self.response_queue = asyncio.Queue() # Clear the queue
+            i = 0
+            # send the command multiple times
+            while self.response_queue.empty() and i < repeat:
+                i += 1
+                await self.client.write_gatt_char(self.WRITE_UUID, command)
+                try:
+                    # Wait for either a response or timeout
+                    return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
+                except asyncio.TimeoutError:
+                    pass
+            return None
+
+    async def request_details(self) -> dict[str, Result]:
+        return await self.read(0x3030, 43)
+
+    async def get_device_name(self):
+        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
+        return "".join(map(chr, device_name))
 
     async def list_services(self):
         for service in self.client.services:
@@ -71,118 +70,4 @@ class BleClient:
                         value = await self.client.read_gatt_descriptor(descriptor.handle)
                         print("    [Descriptor] ", descriptor, value)
                     except Exception as e:
-                        print("    [Descriptor] ", descriptor, e)
-
-    async def get_device_name(self):
-        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
-        return "".join(map(chr, device_name))
-
-    async def request_details(self):
-        self.details_queue = asyncio.Queue()  # Clear the queue
-        i = 0
-        while self.details_queue.empty() and i < 10:
-            i += 1
-            await self.write(0xFE043030002bbf1a)  # Send a request for details to the BLE device
-            try:
-                # Wait for either a response or timeout
-                return await asyncio.wait_for(self.details_queue.get(), timeout=5)
-            except asyncio.TimeoutError:
-                pass
-        return None
-
-    @staticmethod
-    def solar_panel_charge_state(v: int):
-        if 0:
-            return "invalid"
-        elif 1:
-            return "float_charge"
-        elif 2: 
-            return "boost_charge"
-        elif 3:
-            return "equal_charge"
-        else:
-            return "fault"
-        
-    @staticmethod
-    def load_discharge_state(v: int):
-        fault = v & 2 == 1
-        if not fault and v & 1:
-            return "enabled"
-        elif not fault:
-            return "disabled"
-        elif (v >> 2) & 1:
-            return  "over_temperature"
-        elif (v >> 3) & 1:
-            return "open_circuit_protection"
-        elif (v >> 4) & 1:
-            return "hardware_protection"
-        elif (v >> 11) & 1:
-            return "short_circuit_protection"
-        else:
-            return str((v >> 12) & 0b11)
-
-    @staticmethod
-    def parse_details_response(data):
-        if len(data) != 91:
-            return None
-        subdata = data[3:-2]
-        return {
-            "equipment_id": struct.unpack_from(">H", subdata, 0)[0],
-            "run_days": struct.unpack_from(">H", subdata, 2)[0],
-            "battery_full_level": struct.unpack_from(">H", subdata, 4)[0] / 100,
-            "battery_state_1": struct.unpack_from(">H", subdata, 6)[0] & 0xF0 >> 4,
-            "battery_state_2": struct.unpack_from(">H", subdata, 6)[0] & 0x0F,
-            "solar_panel_is_charging": struct.unpack_from(">H", subdata, 8)[0] & 1 > 0,
-            "solar_panel_is_night": struct.unpack_from(">H", subdata, 8)[0] & 32 > 0,
-            "solar_panel_charge_state": BleClient.solar_panel_charge_state(struct.unpack_from(">H", subdata, 8)[0] & 0b1100 >> 2),
-            "solar_panel_state": struct.unpack_from(">H", subdata, 8)[0] & 16 > 0,
-            "load_is_enabled": struct.unpack_from(">H", subdata, 10)[0] & 1 > 0,
-            "load_state": BleClient.load_discharge_state(struct.unpack_from(">H", subdata, 10)[0]),
-            "temperature_1": struct.unpack_from(">H", subdata, 12)[0] / 100,
-            "temperature_2": struct.unpack_from(">H", subdata, 14)[0] / 100,
-            "battery_empty_times": struct.unpack_from(">H", subdata, 16)[0],
-            "battery_full_times": struct.unpack_from(">H", subdata, 18)[0],
-            "battery_percentage": struct.unpack_from(">H", subdata, 42)[0],
-            "battery_voltage": struct.unpack_from(">H", subdata, 44)[0] / 100,
-            "battery_current": struct.unpack_from(">h", subdata, 46)[0] / 100,
-            "battery_power": (struct.unpack_from(">H", subdata, 48)[0] | struct.unpack_from(">h", subdata, 50)[0] << 16) / 100,
-            "load_voltage": struct.unpack_from(">H", subdata, 52)[0] / 100,
-            "load_current": struct.unpack_from(">H", subdata, 54)[0] / 100,
-            "load_power": (struct.unpack_from(">H", subdata, 56)[0] | struct.unpack_from(">H", subdata, 58)[0] << 16) / 100,
-            "solar_panel_voltage": struct.unpack_from(">H", subdata, 60)[0] / 100,
-            "solar_panel_current": struct.unpack_from(">H", subdata, 62)[0] / 100,
-            "solar_panel_power": (struct.unpack_from(">H", subdata, 64)[0] | struct.unpack_from(">H", subdata, 66)[0] << 16) / 100,
-            "solar_panel_daily_energy": struct.unpack_from(">H", subdata, 68)[0] / 100,
-            "solar_panel_total_energy": (struct.unpack_from(">H", subdata, 70)[0] | struct.unpack_from(">H", subdata, 72)[0] << 16) / 100,
-            "load_daily_energy": struct.unpack_from(">H", subdata, 74)[0] / 100,
-            "load_total_energy": (struct.unpack_from(">H", subdata, 76)[0] | struct.unpack_from(">H", subdata, 78)[0] << 16 ) / 100,
-        }
-
-    # Map the keys to their respective units of measurement
-    @staticmethod
-    def get_unit_of_measurement(key):
-        unit_mapping = {
-            "solar_panel_is_charging": None,
-            "solar_panel_is_night": None,
-            "solar_panel_charge_state": None,
-            "solar_panel_state": None,
-            "load_is_enabled": None,
-            "load_state": None,
-            "temperature_1": "°C",
-            "temperature_2": "°C",
-            "battery_percentage": "%",
-            "battery_voltage": "V",
-            "battery_current": "A",
-            "battery_power": "W",
-            "load_voltage": "V",
-            "load_current": "A",
-            "load_power": "W",
-            "solar_panel_voltage": "V",
-            "solar_panel_current": "A",
-            "solar_panel_power": "W",
-            "solar_panel_daily_energy": "kWh",
-            "solar_panel_total_energy": "kWh",
-            "load_daily_energy": "kWh",
-            "load_total_energy": "kWh",
-        }
-        return unit_mapping.get(key, None)
+                        print("    [Descriptor] ", descriptor, e)

+ 19 - 5
src/protocol.py

@@ -1,4 +1,5 @@
 import struct
+from dataclasses import dataclass
 from typing import Any
 
 from .variables import variables, Variable, FunctionCodes
@@ -6,9 +7,13 @@ from .crc import crc16
 
 type Value = str|int|float
 
+@dataclass
+class Result(Variable):
+    value: Value
+
 class LumiaxClient:
     def __init__(self):
-        pass
+        self.device_id = 0xFE
 
     def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int) -> Value:
         if variable.is_32_bit and variable.is_signed:
@@ -129,11 +134,20 @@ class LumiaxClient:
         result = header + bytes(data)
         return result + crc16(result)
 
-
-
-    def parse(self, start_address: int, buffer: bytes) -> list[(Variable, Value)]:
+    def is_complete(self, buffer: bytes) -> bool:
+        if len(buffer) < 4:
+            return False
         device_id = buffer[0]
         function_code = FunctionCodes(buffer[1])
+        if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
+            data_length = buffer[2]
+            return len(buffer) >= data_length + 5
+        else:
+            return len(buffer) >= 8
+
+    def parse(self, start_address: int, buffer: bytes) -> list[Result]:
+        self.device_id = buffer[0]
+        function_code = FunctionCodes(buffer[1])
         if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
             data_length = buffer[2]
             received_crc = buffer[3+data_length:3+data_length+2]
@@ -148,7 +162,7 @@ class LumiaxClient:
                 items = [v for v in variables if address == v.address and function_code.value in v.function_codes]
                 for variable in items:
                     value = self.bytes_to_value(variable, buffer, cursor)
-                    results.append((variable, value))
+                    results.append(Result(**vars(variable), value=value))
                 cursor += 2
                 address += 1
 

+ 22 - 22
tests/transaction_test.py

@@ -26,11 +26,11 @@ class TestTransaction(unittest.TestCase):
         self.assertEqual(len(recv_buf) - 5, byte_count)
 
         results = self.client.parse(start_address, recv_buf)
-        for variable, value in results:
-            self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
         self.assertGreaterEqual(len(results), byte_count / 2)
-        self.assertEqual(hex(results[0][0].address), hex(start_address))
-        self.assertEqual(hex(results[-1][0].address), hex(end_address))
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
 
     def test_2(self):
         device_id = 0x01
@@ -48,12 +48,12 @@ class TestTransaction(unittest.TestCase):
         self.assertEqual(len(recv_buf) - 5, byte_count)
 
         results = self.client.parse(start_address, recv_buf)
-        for variable, value in results:
-            self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
         self.assertGreaterEqual(len(results), byte_count / 2)
-        self.assertEqual(hex(results[0][0].address), hex(start_address))
-        self.assertEqual(results[-1][0].is_32_bit, True)
-        self.assertEqual(hex(results[-1][0].address), hex(end_address-1))
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(results[-1].is_32_bit, True)
+        self.assertEqual(hex(results[-1].address), hex(end_address-1))
 
     def test_3(self):
         device_id = 0x01
@@ -71,12 +71,12 @@ class TestTransaction(unittest.TestCase):
         self.assertEqual(len(recv_buf) - 5, byte_count)
 
         results = self.client.parse(start_address, recv_buf)
-        for variable, value in results:
-            self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
         self.assertGreaterEqual(len(results), byte_count / 2)
-        self.assertEqual(hex(results[0][0].address), hex(start_address))
-        self.assertEqual(hex(results[-1][0].address), hex(end_address))
-        self.assertEqual(results[0][1], 60.0)
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
+        self.assertEqual(results[0].value, 60.0)
 
     def test_4(self):
         device_id = 0x01
@@ -94,10 +94,10 @@ class TestTransaction(unittest.TestCase):
         self.assertEqual(len(recv_buf) - 5, byte_count)
 
         results = self.client.parse(start_address, recv_buf)
-        for variable, value in results:
-            self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
-        self.assertEqual(hex(results[0][0].address), hex(start_address))
-        self.assertEqual(hex(results[-1][0].address), hex(end_address))
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
 
     def test_5(self):
         device_id = 0x01
@@ -115,11 +115,11 @@ class TestTransaction(unittest.TestCase):
         self.assertEqual(len(recv_buf) - 5, byte_count)
 
         results = self.client.parse(start_address, recv_buf)
-        for variable, value in results:
-            self.assertIsNotNone(value, f"{variable.name} ({hex(variable.address)})")
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
         self.assertGreaterEqual(len(results), byte_count / 2)
-        self.assertEqual(hex(results[0][0].address), hex(start_address))
-        self.assertEqual(hex(results[-1][0].address), hex(end_address))
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
 
     def test_6(self):
         device_id = 0x01