Bladeren bron

Merge branch 'modbus-parser-dev'

subDesTagesMitExtraKaese 1 maand geleden
bovenliggende
commit
0d2d75ec68
15 gewijzigde bestanden met toevoegingen van 1287 en 293 verwijderingen
  1. 1 0
      .gitignore
  2. 0 188
      bleclient.py
  3. 0 13
      crc.py
  4. 87 92
      main.py
  5. 0 0
      src/__init__.py
  6. 106 0
      src/bleclient.py
  7. 11 0
      src/crc.py
  8. 157 0
      src/homeassistant.py
  9. 252 0
      src/protocol.py
  10. 405 0
      src/variables.py
  11. 9 0
      tests/__main__.py
  12. 70 0
      tests/compatibility_test.py
  13. 9 0
      tests/main_test.py
  14. 146 0
      tests/transaction_test.py
  15. 34 0
      tests/variable_test.py

+ 1 - 0
.gitignore

@@ -1 +1,2 @@
 *.pyc
+venv/

+ 0 - 188
bleclient.py

@@ -1,188 +0,0 @@
-import asyncio
-import struct
-from bleak import BleakClient, BleakScanner
-from bleak.backends.characteristic import BleakGATTCharacteristic
-
-from crc import crc16
-
-class BleClient:
-    DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
-    NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
-    WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
-
-    buffer = bytearray()
-
-    def __init__(self, mac_address: str):
-        self.client = BleakClient(mac_address)
-        self.details_queue = asyncio.Queue()  # Queue to store the received details
-
-    async def __aenter__(self):
-        await self.client.connect()  # Connect to the BLE device
-        await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler)  # Start receiving notifications
-        return self
-
-    async def __aexit__(self, exc_type, exc, tb):
-        await self.client.stop_notify(self.NOTIFY_UUID)  # Stop receiving notifications
-        await self.client.disconnect()  # Disconnect from the BLE device
-
-    async def write(self, cmd: int):
-        data = cmd.to_bytes(8, 'big')  # Convert the command to a byte array
-        crc = data[-2:]  # Extract the CRC from the data
-        values = data[:-2]  # Extract the values from the data
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, replace it with the calculated CRC
-            data = values + crc2
-        #print("write ", self.WRITE_UUID, data.hex())
-        await self.client.write_gatt_char(self.WRITE_UUID, data)  # Write the data to the BLE device
-
-    async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
-        if characteristic.uuid != self.NOTIFY_UUID:
-            return
-        self.buffer += data  # Append the received data to the buffer
-        if len(self.buffer) < 3:
-            return
-
-        crc = self.buffer[-2:]  # Extract the CRC from the buffer
-        values = data[:-2]  # Extract the values from the buffer
-        crc2 = crc16(values)  # Calculate the CRC of the values
-        if crc != crc2:
-            # If the calculated CRC doesn't match the extracted CRC, ignore the data
-            pass
-        if self.buffer[0] != 0x01:
-            print("invalid start byte", self.buffer.hex())
-            self.buffer = bytearray()
-            return
-        if len(self.buffer) == 91:
-            response = BleClient.parse_details_response(self.buffer)  # Parse the details response from the buffer
-            self.details_queue.put_nowait(response)  # Add the parsed response to the queue
-            self.buffer = bytearray()
-        if len(self.buffer) >= 91:
-            print(f"received too many bytes ({len(self.buffer)})")
-            self.buffer = bytearray()  # Clear the buffer
-
-    async def list_services(self):
-        for service in self.client.services:
-            print("[Service] %s", service)
-            for char in service.characteristics:
-                print("  [Characteristic] ", char, ",".join(char.properties))
-                for descriptor in char.descriptors:
-                    try:
-                        value = await self.client.read_gatt_descriptor(descriptor.handle)
-                        print("    [Descriptor] ", descriptor, value)
-                    except Exception as e:
-                        print("    [Descriptor] ", descriptor, e)
-
-    async def get_device_name(self):
-        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
-        return "".join(map(chr, device_name))
-
-    async def request_details(self):
-        self.details_queue = asyncio.Queue()  # Clear the queue
-        i = 0
-        while self.details_queue.empty() and i < 10:
-            i += 1
-            await self.write(0xFE043030002bbf1a)  # Send a request for details to the BLE device
-            try:
-                # Wait for either a response or timeout
-                return await asyncio.wait_for(self.details_queue.get(), timeout=5)
-            except asyncio.TimeoutError:
-                pass
-        return None
-
-    @staticmethod
-    def solar_panel_charge_state(v: int):
-        if 0:
-            return "invalid"
-        elif 1:
-            return "float_charge"
-        elif 2: 
-            return "boost_charge"
-        elif 3:
-            return "equal_charge"
-        else:
-            return "fault"
-        
-    @staticmethod
-    def load_discharge_state(v: int):
-        fault = v & 2 == 1
-        if not fault and v & 1:
-            return "enabled"
-        elif not fault:
-            return "disabled"
-        elif (v >> 2) & 1:
-            return  "over_temperature"
-        elif (v >> 3) & 1:
-            return "open_circuit_protection"
-        elif (v >> 4) & 1:
-            return "hardware_protection"
-        elif (v >> 11) & 1:
-            return "short_circuit_protection"
-        else:
-            return str((v >> 12) & 0b11)
-
-    @staticmethod
-    def parse_details_response(data):
-        if len(data) != 91:
-            return None
-        subdata = data[3:-2]
-        return {
-            "equipment_id": struct.unpack_from(">H", subdata, 0)[0],
-            "run_days": struct.unpack_from(">H", subdata, 2)[0],
-            "battery_full_level": struct.unpack_from(">H", subdata, 4)[0] / 100,
-            "battery_state_1": struct.unpack_from(">H", subdata, 6)[0] & 0xF0 >> 4,
-            "battery_state_2": struct.unpack_from(">H", subdata, 6)[0] & 0x0F,
-            "solar_panel_is_charging": struct.unpack_from(">H", subdata, 8)[0] & 1 > 0,
-            "solar_panel_is_night": struct.unpack_from(">H", subdata, 8)[0] & 32 > 0,
-            "solar_panel_charge_state": BleClient.solar_panel_charge_state(struct.unpack_from(">H", subdata, 8)[0] & 0b1100 >> 2),
-            "solar_panel_state": struct.unpack_from(">H", subdata, 8)[0] & 16 > 0,
-            "load_is_enabled": struct.unpack_from(">H", subdata, 10)[0] & 1 > 0,
-            "load_state": BleClient.load_discharge_state(struct.unpack_from(">H", subdata, 10)[0]),
-            "temperature_1": struct.unpack_from(">H", subdata, 12)[0] / 100,
-            "temperature_2": struct.unpack_from(">H", subdata, 14)[0] / 100,
-            "battery_empty_times": struct.unpack_from(">H", subdata, 16)[0],
-            "battery_full_times": struct.unpack_from(">H", subdata, 18)[0],
-            "battery_percentage": struct.unpack_from(">H", subdata, 42)[0],
-            "battery_voltage": struct.unpack_from(">H", subdata, 44)[0] / 100,
-            "battery_current": struct.unpack_from(">h", subdata, 46)[0] / 100,
-            "battery_power": (struct.unpack_from(">H", subdata, 48)[0] | struct.unpack_from(">h", subdata, 50)[0] << 16) / 100,
-            "load_voltage": struct.unpack_from(">H", subdata, 52)[0] / 100,
-            "load_current": struct.unpack_from(">H", subdata, 54)[0] / 100,
-            "load_power": (struct.unpack_from(">H", subdata, 56)[0] | struct.unpack_from(">H", subdata, 58)[0] << 16) / 100,
-            "solar_panel_voltage": struct.unpack_from(">H", subdata, 60)[0] / 100,
-            "solar_panel_current": struct.unpack_from(">H", subdata, 62)[0] / 100,
-            "solar_panel_power": (struct.unpack_from(">H", subdata, 64)[0] | struct.unpack_from(">H", subdata, 66)[0] << 16) / 100,
-            "solar_panel_daily_energy": struct.unpack_from(">H", subdata, 68)[0] / 100,
-            "solar_panel_total_energy": (struct.unpack_from(">H", subdata, 70)[0] | struct.unpack_from(">H", subdata, 72)[0] << 16) / 100,
-            "load_daily_energy": struct.unpack_from(">H", subdata, 74)[0] / 100,
-            "load_total_energy": (struct.unpack_from(">H", subdata, 76)[0] | struct.unpack_from(">H", subdata, 78)[0] << 16 ) / 100,
-        }
-
-    # Map the keys to their respective units of measurement
-    @staticmethod
-    def get_unit_of_measurement(key):
-        unit_mapping = {
-            "solar_panel_is_charging": None,
-            "solar_panel_is_night": None,
-            "solar_panel_charge_state": None,
-            "solar_panel_state": None,
-            "load_is_enabled": None,
-            "load_state": None,
-            "temperature_1": "°C",
-            "temperature_2": "°C",
-            "battery_percentage": "%",
-            "battery_voltage": "V",
-            "battery_current": "A",
-            "battery_power": "W",
-            "load_voltage": "V",
-            "load_current": "A",
-            "load_power": "W",
-            "solar_panel_voltage": "V",
-            "solar_panel_current": "A",
-            "solar_panel_power": "W",
-            "solar_panel_daily_energy": "kWh",
-            "solar_panel_total_energy": "kWh",
-            "load_daily_energy": "kWh",
-            "load_total_energy": "kWh",
-        }
-        return unit_mapping.get(key, None)

+ 0 - 13
crc.py

@@ -1,13 +0,0 @@
-def crc16(data):
-    bArr2 = [0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64, 0, -63, -127, 64, 1, -64, -128, 65, 0, -63, -127, 64, 1, -64, -128, 65, 1, -64, -128, 65, 0, -63, -127, 64]
-    bArr3 = [0, -64, -63, 1, -61, 3, 2, -62, -58, 6, 7, -57, 5, -59, -60, 4, -52, 12, 13, -51, 15, -49, -50, 14, 10, -54, -53, 11, -55, 9, 8, -56, -40, 24, 25, -39, 27, -37, -38, 26, 30, -34, -33, 31, -35, 29, 28, -36, 20, -44, -43, 21, -41, 23, 22, -42, -46, 18, 19, -45, 17, -47, -48, 16, -16, 48, 49, -15, 51, -13, -14, 50, 54, -10, -9, 55, -11, 53, 52, -12, 60, -4, -3, 61, -1, 63, 62, -2, -6, 58, 59, -5, 57, -7, -8, 56, 40, -24, -23, 41, -21, 43, 42, -22, -18, 46, 47, -17, 45, -19, -20, 44, -28, 36, 37, -27, 39, -25, -26, 38, 34, -30, -29, 35, -31, 33, 32, -32, -96, 96, 97, -95, 99, -93, -94, 98, 102, -90, -89, 103, -91, 101, 100, -92, 108, -84, -83, 109, -81, 111, 110, -82, -86, 106, 107, -85, 105, -87, -88, 104, 120, -72, -71, 121, -69, 123, 122, -70, -66, 126, 127, -65, 125, -67, -68, 124, -76, 116, 117, -75, 119, -73, -74, 118, 114, -78, -77, 115, -79, 113, 112, -80, 80, -112, -111, 81, -109, 83, 82, -110, -106, 86, 87, -105, 85, -107, -108, 84, -100, 92, 93, -99, 95, -97, -98, 94, 90, -102, -101, 91, -103, 89, 88, -104, -120, 72, 73, -119, 75, -117, -118, 74, 78, -114, -113, 79, -115, 77, 76, -116, 68, -124, -123, 69, -121, 71, 70, -122, -126, 66, 67, -125, 65, -127, -128, 64]
-    i = 0
-    b2 = 255
-    b3 = 255
-    for i in range(len(data)):
-        b4 = (b3 ^ data[i]) & 255
-        b5 = bArr3[b4] & 255
-        b3 = b2 ^ (bArr2[b4] & 255)
-        b2 = b5
-    b6 = ((b2 & 255) << 8) | (b3 & 255 & 65535)
-    return (((b6 & 255) << 8) | ((65280 & b6) >> 8)).to_bytes(2, 'big')

+ 87 - 92
main.py

@@ -3,108 +3,83 @@
 import argparse
 import asyncio
 import signal
-import json
+import traceback
 
 import aiomqtt
+from bleak import BleakScanner
 from bleak.exc import BleakError, BleakDeviceNotFoundError
 
-from bleclient import BleClient
+from src.homeassistant import MqttSensor
+from src.bleclient import BleClient, Result
+from src.variables import variables, VariableContainer, battery_and_load_parameters, switches
 
-send_config = True
+request_interval = 20   # In seconds
 reconnect_interval = 5  # In seconds
 
-async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
-    global send_config
-    # Define the base topic for MQTT Discovery
-    base_topic = "homeassistant"
-
-    # Define the device information
-    device_info = {
-        "identifiers": ["solarlife_mppt_ble"],
-        "name": "Solarlife MPPT",
-        "manufacturer": "Solarlife",
-    }
-
-    # Publish each item in the details dictionary to its own MQTT topic
-    for key, value in details.items():
-        state_topic = f"{base_topic}/sensor/solarlife/{key}/state"
-        topic = f"{base_topic}/sensor/solarlife/{key}/config"
-
-        # Create the MQTT Discovery payload
-        payload = {
-            "name": f"Solarlife {key.replace('_', ' ').title()}",
-            "device": device_info,
-            "unique_id": f"solarlife_{key}",
-            "state_topic": state_topic,
-            "unit_of_measurement": BleClient.get_unit_of_measurement(key)
-        }
-        if "daily_energy" in key:
-            payload['device_class'] = "energy"
-            payload['state_class'] = "total_increasing"
-        elif "total_energy" in key:
-            payload['device_class'] = "energy"
-            payload['state_class'] = "total"
-        elif "voltage" in key:
-            payload['device_class'] = "voltage"
-            payload['state_class'] = "measurement"
-        elif "current" in key:
-            payload['device_class'] = "current"
-            payload['state_class'] = "measurement"
-        elif "power" in key:
-            payload['device_class'] = "power"
-            payload['state_class'] = "measurement"
-        elif "temperature" in key:
-            payload['device_class'] = "temperature"
-            payload['state_class'] = "measurement"
-        elif key == "battery_percentage":
-            payload['device_class'] = "battery"
-            payload['state_class'] = "measurement"
-
-        # Publish the MQTT Discovery payload
-        if send_config:
-            print(f"Publishing MQTT Discovery payload for {key}")
-            await client.publish(topic, payload=json.dumps(payload), retain=True)
-
-        # Publish the entity state
-        await client.publish(state_topic, payload=str(value))
-    send_config = False
-    
-async def main(address, host, port, username, password):
-    async def run_mppt():
-        while True:
+async def request_and_publish_details(sensor: MqttSensor, mppt: BleClient) -> None:
+    details = await mppt.request_details()
+    if details:
+        print(f"Battery: {details['battery_percentage'].value}% ({details['battery_voltage'].value}V)")
+        await sensor.publish(details)
+    else:
+        print("No values recieved")
+
+async def subscribe_and_watch(sensor: MqttSensor, mppt: BleClient):
+    parameters = battery_and_load_parameters[:12] + switches
+    await sensor.subscribe(parameters)
+    while True:
+        command = await sensor.get_command()
+        print(f"Received command to set {command.name} to '{command.value}'")
+        results = await mppt.write([command])
+        await sensor.publish(results)
+
+async def run_mppt(sensor: MqttSensor, address: str):
+    task = None
+    loop = asyncio.get_event_loop()
+    try:
+        async with BleClient(address) as mppt:
+            task = loop.create_task(subscribe_and_watch(sensor, mppt))
+            parameters = await mppt.request_parameters()
+            await sensor.publish(parameters)
+            while True:
+                await request_and_publish_details(sensor, mppt)
+                await asyncio.sleep(request_interval)
+                if not task.cancelled() and task.exception:
+                    break
+    except asyncio.TimeoutError:
+        print("BLE communication timed out")
+    except BleakDeviceNotFoundError:
+        print(f"BLE device with address {address} was not found")
+    except BleakError as e:
+        print(f"BLE error occurred: {e}")
+    finally:
+        if task:
+            task.cancel()
             try:
-                async with aiomqtt.Client(hostname=host, port=port, username=username, password=password) as client:
-                    print(f"Connecting to MQTT broker at {host}:{port}")
-                    while True:
-                        try:
-                            async with BleClient(address) as mppt:
-                                while True:
-                                    details = await mppt.request_details()
-                                    if details:
-                                        print(f"Battery: {details['battery_percentage']}% ({details['battery_voltage']}V)")
-                                        await mqtt_publish(details, client)
-                                    else:
-                                        print("No values recieved")
-                                    await asyncio.sleep(20.0)
-
-                        except BleakDeviceNotFoundError:
-                            print(f"BLE device with address {address} was not found")
-                            await asyncio.sleep(5)
-                        except BleakError as e:
-                            print(f"BLE error occurred: {e}")
-                            await asyncio.sleep(5)
-            except aiomqtt.MqttError as error:
-                print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
-                await asyncio.sleep(reconnect_interval)
+                await task
             except asyncio.CancelledError:
-                raise  # Re-raise the CancelledError to stop the task
-            except Exception as e:
-                print(f"An error occurred during BLE communication: {e}")
-                await asyncio.sleep(5)  # Wait for 5 seconds before retrying
-
+                pass
+
+async def run_mqtt(address, host, port, username, password):
+    while True:
+        try:
+            async with MqttSensor(hostname=host, port=port, username=username, password=password) as sensor:
+                print(f"Connected to MQTT broker at {host}:{port}")
+                while True:
+                    await run_mppt(sensor, address)
+                    await asyncio.sleep(reconnect_interval)
+        except aiomqtt.MqttError as error:
+            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
+        except asyncio.CancelledError:
+            raise  # Re-raise the CancelledError to stop the task
+        except Exception as e:
+            print(traceback.format_exc())
+        await asyncio.sleep(reconnect_interval)
+
+async def main(*args):
     try:
         loop = asyncio.get_running_loop()
-        task = loop.create_task(run_mppt())
+        task = loop.create_task(run_mqtt(*args))
 
         # Setup signal handler to cancel the task on termination
         for signame in {'SIGINT', 'SIGTERM'}:
@@ -116,6 +91,19 @@ async def main(address, host, port, username, password):
     except asyncio.CancelledError:
         pass  # Task was cancelled, no need for an error message
 
+async def list_services(address):
+    async with BleClient(address) as mppt:
+        await mppt.list_services()
+
+async def scan_for_devices():
+    devices = await BleakScanner.discover()
+    if not devices:
+        print("No BLE devices found.")
+    else:
+        print("Available BLE devices:")
+        for device in devices:
+            print(f"{device.address} - {device.name}")
+    return devices
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(description='Solarlife MPPT BLE Client')
@@ -124,7 +112,14 @@ if __name__ == '__main__':
     parser.add_argument('--port', help='MQTT broker port', default=1883, type=int)
     parser.add_argument('--username', help='MQTT username')
     parser.add_argument('--password', help='MQTT password')
+    parser.add_argument('--list-services', help='List GATT services', action='store_true')
+    parser.add_argument('--scan', help='Scan for bluetooth devices', action='store_true')
 
     args = parser.parse_args()
 
-    asyncio.run(main(args.address, args.host, args.port, args.username, args.password))
+    if args.scan:
+        asyncio.run(scan_for_devices())
+    elif args.list_services:
+        asyncio.run(list_services(args.address))
+    else:
+        asyncio.run(main(args.address, args.host, args.port, args.username, args.password))

+ 0 - 0
src/__init__.py


+ 106 - 0
src/bleclient.py

@@ -0,0 +1,106 @@
+import asyncio
+import struct
+from bleak import BleakClient, BleakScanner
+from bleak.backends.characteristic import BleakGATTCharacteristic
+
+from src.crc import crc16
+from src.protocol import LumiaxClient, ResultContainer, Result
+
+class BleClient(LumiaxClient):
+    DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
+    NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
+    WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
+
+    buffer = bytearray()
+
+    def __init__(self, mac_address: str):
+        self.client = BleakClient(mac_address)
+        self.response_queue = asyncio.Queue()
+        self.lock = asyncio.Lock()
+
+    async def __aenter__(self):
+        await self.client.connect()  # Connect to the BLE device
+        await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler)  # Start receiving notifications
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        await self.client.stop_notify(self.NOTIFY_UUID)  # Stop receiving notifications
+        await self.client.disconnect()  # Disconnect from the BLE device
+
+    def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
+        if characteristic.uuid != self.NOTIFY_UUID:
+            return
+        self.buffer += data  # Append the received data to the buffer
+        try:
+            if not self.is_complete(self.buffer):
+                return
+            results = self.parse(self.start_address, self.buffer)
+            self.response_queue.put_nowait(results)
+        except Exception as e:
+            print(f"Response from device: 0x{self.buffer.hex()}")
+            print(f"Error while parsing response: {e}")
+
+    async def read(self, start_address: int, count: int, repeat = 10, timeout = 5) -> ResultContainer:
+        async with self.lock:
+            self.start_address = start_address
+            command = self.get_read_command(0xFE, start_address, count)
+            self.response_queue = asyncio.Queue() # Clear the queue
+            i = 0
+            # send the command multiple times
+            while i < repeat:
+                i += 1
+                self.buffer = bytearray()
+                await self.client.write_gatt_char(self.WRITE_UUID, command)
+                try:
+                    # Wait for either a response or timeout
+                    return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
+                except asyncio.TimeoutError:
+                    print(f"Repeating read command...")
+            return ResultContainer([])
+
+    async def request_details(self) -> ResultContainer:
+        return await self.read(0x3030, 41)
+
+    async def request_parameters(self) -> ResultContainer:
+        return await self.read(0x9021, 12)
+    
+    async def write(self, results: list[Result], repeat = 10, timeout = 5) -> ResultContainer:
+        async with self.lock:
+            start_address, command = self.get_write_command(0xFE, results)
+            self.start_address = start_address
+            self.response_queue = asyncio.Queue() # Clear the queue
+            i = 0
+            # send the command multiple times
+            while i < repeat:
+                i += 1
+                self.buffer = bytearray()
+                await self.client.write_gatt_char(self.WRITE_UUID, command)
+                print(f"Wrote command 0x{command.hex()}")
+                try:
+                    # Wait for either a response or timeout
+                    await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
+                    return ResultContainer(results)
+                except asyncio.TimeoutError:
+                    print(f"Repeating write command...")
+            return ResultContainer([])
+
+    async def get_device_name(self):
+        device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)  # Read the device name from the BLE device
+        return "".join(map(chr, device_name))
+
+    async def list_services(self):
+        service_text = "[Service]          "
+        charact_text = "  [Characteristic] "
+        descrip_text = "    [Descriptor]   "
+        value_text   = "      Value = "
+        for service in self.client.services:
+            print(service_text, service)
+            for char in service.characteristics:
+                print(charact_text, char, ",".join(char.properties))
+                for descriptor in char.descriptors:
+                    try:
+                        print(descrip_text, descriptor)
+                        value = await self.client.read_gatt_descriptor(descriptor.handle)
+                        print(value_text, "0x" + value.hex())
+                    except Exception as e:
+                        pass

+ 11 - 0
src/crc.py

@@ -0,0 +1,11 @@
+def crc16(data: bytes) -> bytes:
+    crc = 0xFFFF
+    for n in range(len(data)):
+        crc ^= data[n]
+        for i in range(8):
+            if crc & 1:
+                crc >>= 1
+                crc ^= 0xA001
+            else:
+                crc >>= 1
+    return crc.to_bytes(2, 'little')

+ 157 - 0
src/homeassistant.py

@@ -0,0 +1,157 @@
+import json
+import re
+
+from aiomqtt import Client
+
+from src.protocol import ResultContainer, Result, FunctionCodes
+from src.variables import VariableContainer, Variable, variables
+
+class MqttSensor(Client):
+    # Define the base topic for MQTT Discovery
+    base_topic = "homeassistant"
+
+    # Define the sensor name
+    sensor_name = "solarlife"
+
+    # Define the device information
+    device_info = {
+        "identifiers": ["solarlife_mppt_ble"],
+        "name": "Solarlife",
+        "manufacturer": "Solarlife",
+    }
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.known_names = set()
+        self.subscribed_names = set()
+
+    # https://www.home-assistant.io/integrations/#search/mqtt
+    def get_platform(self, variable: Variable) -> str:
+        is_writable = FunctionCodes.WRITE_MEMORY_SINGLE.value in variable.function_codes or \
+                      FunctionCodes.WRITE_STATUS_REGISTER.value in variable.function_codes
+        is_numeric = variable.multiplier != 0
+        if variable.binary_payload:
+            on, off = variable.binary_payload
+            if is_writable and off:
+                return "switch"
+            elif is_writable:
+                return "button"
+            else:
+                return "binary_sensor"
+        elif is_writable and is_numeric:
+            return "number"
+        elif is_writable:
+            # to-do: select or text 
+            pass
+        return "sensor"
+
+    def get_config_topic(self, variable: Variable) -> str:
+        platform = self.get_platform(variable)
+        return f"{self.base_topic}/{platform}/{self.sensor_name}/{variable.name}/config"
+    
+    def get_state_topic(self, variable: Variable) -> str:
+        platform = self.get_platform(variable)
+        return f"{self.base_topic}/{platform}/{self.sensor_name}/{variable.name}/state"
+
+    def get_command_topic(self, variable: Variable) -> str:
+        platform = self.get_platform(variable)
+        return f"{self.base_topic}/{platform}/{self.sensor_name}/{variable.name}/command"
+
+    async def store_config(self, variables: VariableContainer) -> None:
+        # Publish each item in the results to its own MQTT topic
+        for key, variable in variables.items():
+            if key in self.known_names:
+                continue
+            self.known_names.add(key)
+
+            platform = self.get_platform(variable)
+            config_topic = self.get_config_topic(variable)
+            state_topic = self.get_state_topic(variable)
+            command_topic = self.get_command_topic(variable)
+
+            print(f"Publishing homeassistant config for {platform} {key}")
+
+            # Create the MQTT Discovery payload
+            payload = {
+                "name": variable.friendly_name,
+                "device": self.device_info,
+                "object_id": key,
+                "unique_id": f"solarlife_{key}",
+                "state_topic": state_topic,
+            }
+
+            if variable.multiplier != 0:
+                payload["unit_of_measurement"] = variable.unit
+                payload["mode"] = "box"
+                payload["min"] = 0
+                
+            if "daily" in key and "Wh" in variable.unit:
+                payload['device_class'] = "energy"
+                payload['state_class'] = "total_increasing"
+            elif "total" in key and "Wh" in variable.unit:
+                payload['device_class'] = "energy"
+                payload['state_class'] = "total"
+            elif "Wh" in variable.unit:
+                payload['device_class'] = "energy"
+                payload['state_class'] = "measurement"
+            elif "V" in variable.unit:
+                payload['device_class'] = "voltage"
+                payload['state_class'] = "measurement"
+            elif "A" in variable.unit:
+                payload['device_class'] = "current"
+                payload['state_class'] = "measurement"
+            elif "W" in variable.unit:
+                payload['device_class'] = "power"
+                payload['state_class'] = "measurement"
+            elif "°C" in variable.unit:
+                payload['device_class'] = "temperature"
+                payload['state_class'] = "measurement"
+            elif key == "battery_percentage":
+                payload['device_class'] = "battery"
+                payload['state_class'] = "measurement"
+            elif "timing_period" in key or "delay" in key or "total_light_time" in key:
+                payload['device_class'] = "duration"
+
+            if variable.binary_payload:
+                on, off = variable.binary_payload
+                payload["payload_on"] = on
+                payload["payload_off"] = off
+
+            # Handle writable entities
+            if FunctionCodes.WRITE_MEMORY_SINGLE.value in variable.function_codes or \
+               FunctionCodes.WRITE_STATUS_REGISTER.value in variable.function_codes:
+                payload["command_topic"] = command_topic
+
+            # Publish the MQTT Discovery payload
+            await super().publish(config_topic, payload=json.dumps(payload), retain=True)
+
+    async def publish(self, results: ResultContainer):
+        await self.store_config(results)
+        # Publish each item in the details dictionary to its own MQTT topic
+        for key, result in results.items():
+            state_topic = self.get_state_topic(result)
+            is_writable = FunctionCodes.WRITE_MEMORY_SINGLE.value in result.function_codes or \
+                          FunctionCodes.WRITE_STATUS_REGISTER.value in result.function_codes
+
+            # Publish the entity state
+            await super().publish(state_topic, payload=str(result.value), retain=is_writable)
+
+    async def subscribe(self, variables: VariableContainer):
+        for key, variable in variables.items():
+            if FunctionCodes.WRITE_MEMORY_SINGLE.value in variable.function_codes or \
+               FunctionCodes.WRITE_STATUS_REGISTER.value in variable.function_codes:
+                if key in self.subscribed_names:
+                    continue
+                self.subscribed_names.add(key)
+                platform = self.get_platform(variable)
+                command_topic = self.get_command_topic(variable)
+                print(f"Subscribing to homeassistant commands for {platform} {variable.name}")
+                await super().subscribe(topic=command_topic, qos=2)
+    
+    async def get_command(self) -> Result:
+        message = await anext(self.messages)
+        match = re.match(rf"^{self.base_topic}/\w+/{self.sensor_name}/(\w+)/", message.topic.value)
+        variable_name = match.group(1)
+        variable = variables[variable_name]
+        value = str(message.payload, encoding="utf8")
+        return Result(**vars(variable), value=value)

+ 252 - 0
src/protocol.py

@@ -0,0 +1,252 @@
+import struct
+from dataclasses import dataclass
+from typing import Any, List, Union, Tuple, Optional
+
+from .variables import variables, Variable, FunctionCodes
+from .crc import crc16
+
+type Value = str|int|float
+
+@dataclass
+class Result(Variable):
+    value: Value
+
+class ResultContainer:
+    def __init__(self, results: List[Result]):
+        self._results = results
+        self._result_map = {res.name: res for res in results}
+
+    def __getitem__(self, key: Union[int, str, slice]) -> Result:
+        if isinstance(key, int):
+            return self._results[key]
+        elif isinstance(key, str):
+            return self._result_map[key]
+        elif isinstance(key, slice):
+            return ResultContainer(self._results[key])
+        else:
+            raise TypeError("Key must be an integer index or a result name string.")
+
+    def __len__(self):
+        return len(self._results)
+
+    def __iter__(self):
+        return iter(self._results)
+    
+    def __add__(self, other) -> list[Result]:
+        return ResultContainer(self._results + other._results)
+    
+    def __bool__(self):
+        return len(self._results) > 0
+
+    def items(self):
+        return self._result_map.items()
+
+    def get(self, key: str) -> Optional[Result]:
+        if isinstance(key, str):
+            return self._result_map.get(key)
+        else:
+            raise TypeError("Key must be a variable name string.")
+
+class LumiaxClient:
+    def __init__(self):
+        self.device_id = 0xFE
+
+    def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int) -> Value:
+        if variable.is_32_bit and variable.is_signed:
+            raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">h", buffer, offset + 2)[0] << 16
+        elif variable.is_32_bit:
+            raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">H", buffer, offset + 2)[0] << 16
+        elif variable.is_signed:
+            raw_value = struct.unpack_from(">h", buffer, offset)[0]
+        else:
+            raw_value = struct.unpack_from(">H", buffer, offset)[0]
+
+        if variable.multiplier:
+            value = raw_value / variable.multiplier
+        elif variable.func:
+            try:
+                value = variable.func(raw_value)
+            except IndexError as e:
+                raise Exception(f"unexpected value for {variable.name} ({hex(variable.address)}): '{raw_value}'")
+        else:
+            value = raw_value
+        return value
+
+    def value_to_bytes(self, variable: Variable, buffer: bytearray, offset: int, value: Value) -> int:
+        if variable.multiplier and not variable.func:
+            raw_value = round(float(value) * variable.multiplier)
+        elif variable.func:
+            raw_value = self._find_raw_value_by_brute_force(variable, value)
+            if raw_value == None:
+                raise Exception(f"invalid value for {variable.name}: '{value}'")
+        elif variable.binary_payload and value == variable.binary_payload[0]:
+            raw_value = 0xFF00
+        elif variable.binary_payload and value == variable.binary_payload[1]:
+            raw_value = 0
+        elif variable.binary_payload:
+            raise Exception(f"invalid binary value for {variable.name}: '{value}'")
+        else:
+            raw_value = int(value)
+
+        if variable.is_32_bit and variable.is_signed:
+            struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
+            struct.pack_into(">h", buffer, offset + 2, raw_value >> 16)
+        elif variable.is_32_bit:
+            struct.pack_into(">H", buffer, offset, raw_value & 0xFFFF)
+            struct.pack_into(">H", buffer, offset + 2, raw_value >> 16)
+        elif variable.is_signed:
+            struct.pack_into(">h", buffer, offset, raw_value)
+        else:
+            struct.pack_into(">H", buffer, offset, raw_value)
+
+        length = 4 if variable.is_32_bit else 2
+        return offset + length
+
+    def get_read_command(self, device_id: int, start_address: int, count: int) -> bytes:
+        items = [v for v in variables if v.address >= start_address and v.address < start_address + count]
+        if not items:
+            raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} contains no variables")
+        
+        function_code = items[0].function_codes[0]
+        if not all(function_code in v.function_codes for v in items):
+            raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} spans multiple function codes")
+
+        result = bytes([
+            device_id, 
+            function_code,
+            start_address >> 8,
+            start_address & 0xFF,
+            count >> 8,
+            count & 0xFF
+        ])
+        return result + crc16(result)
+
+    def get_write_command(self, device_id: int, results: list[Result]) -> Tuple[int, bytes]:
+        if not results:
+            raise Exception(f"values list is empty")
+        results.sort(key=lambda x: x.address)
+        address = results[0].address
+        for result in results:
+            if result.value is None:
+                raise Exception(f"value of {result.name} ({hex(result.address)}) is empty")
+            if address < result.address:
+                raise Exception(f"variables are not continuous at {hex(result.address)}")
+            address = result.address + (2 if result.is_32_bit else 1)
+
+        start_variable = results[0]
+        end_variable = results[-1]
+        start_address = start_variable.address
+        end_address = end_variable.address + (1 if end_variable.is_32_bit else 0)
+        count = end_address - start_address + 1
+        byte_count = count * 2
+        if byte_count > 255:
+            raise Exception(f"address range is too large")
+
+        if count > 1:
+            function_code = FunctionCodes.WRITE_MEMORY_RANGE
+            header = bytes([
+                device_id,
+                function_code.value,
+                start_address >> 8,
+                start_address & 0xFF,
+                count >> 8,
+                count & 0xFF,
+                byte_count,
+            ])
+        else:
+            if FunctionCodes.WRITE_STATUS_REGISTER.value in results[0].function_codes:
+                function_code = FunctionCodes.WRITE_STATUS_REGISTER
+            else:
+                function_code = FunctionCodes.WRITE_MEMORY_SINGLE
+            header = bytes([
+                device_id,
+                function_code.value,
+                start_address >> 8,
+                start_address & 0xFF,
+            ])
+
+        if not all(function_code.value in x.function_codes for x in results):
+            raise Exception(f"function code {function_code.name} is not supported for all addresses")
+
+        data = bytearray(byte_count)
+        for result in results:
+            offset = (result.address - start_address) * 2
+            self.value_to_bytes(result, data, offset, result.value)
+
+        result = header + bytes(data)
+        return start_address, result + crc16(result)
+
+    def is_complete(self, buffer: bytes) -> bool:
+        if len(buffer) < 4:
+            return False
+        device_id = buffer[0]
+        if not buffer[1] in FunctionCodes._value2member_map_:
+            return False
+        function_code = FunctionCodes(buffer[1])
+        if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
+            data_length = buffer[2]
+            return len(buffer) >= data_length + 5
+        else:
+            return len(buffer) >= 8
+
+    def parse(self, start_address: int, buffer: bytes) -> ResultContainer:
+        self.device_id = buffer[0]
+        function_code = FunctionCodes(buffer[1])
+        results = []
+        if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
+            data_length = buffer[2]
+            received_crc = buffer[3+data_length:3+data_length+2]
+            calculated_crc = crc16(buffer[:3+data_length])
+            if received_crc != calculated_crc:
+                raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
+
+            address = start_address
+            cursor = 3
+            while cursor < data_length + 3:
+                items = [v for v in variables if address == v.address and function_code.value in v.function_codes]
+                for variable in items:
+                    value = self.bytes_to_value(variable, buffer, cursor)
+                    results.append(Result(**vars(variable), value=value))
+                cursor += 2
+                address += 1
+        else:
+            address = struct.unpack_from('>H', buffer, 2)[0]
+            if address != start_address:
+                raise Exception(f"Write result address mismatch ({hex(address)} != {hex(start_address)})")
+            received_crc = buffer[6:8]
+            calculated_crc = crc16(buffer[:6])
+            if received_crc != calculated_crc:
+                raise Exception(f"CRC mismatch (0x{calculated_crc.hex()} != 0x{received_crc.hex()})")
+            
+            if function_code in [FunctionCodes.WRITE_MEMORY_SINGLE, FunctionCodes.WRITE_STATUS_REGISTER]:
+                variable = [v for v in variables if address == v.address and function_code.value in v.function_codes][0]
+                value = self.bytes_to_value(variable, buffer, 4)
+                results.append(Result(**vars(variable), value=value))
+
+        return ResultContainer(results)
+
+    def _find_raw_value_by_brute_force(self, variable: Variable, value: str):
+        if variable.multiplier:
+            value = float(value) * variable.multiplier
+        n_bits = 32 if variable.is_32_bit else 16
+        if variable.is_signed:
+            for i in range(0, 2**(n_bits-1) + 1):
+                try:
+                    if variable.func(i) == value:
+                        return i
+                except IndexError:
+                    pass
+            for i in range(0, -2**(n_bits-1) - 2, -1):
+                try:
+                    if variable.func(i) == value:
+                        return i
+                except IndexError:
+                    pass
+        else:
+            for i in range(0, 2**n_bits + 1):
+                try:
+                    if variable.func(i) == value:
+                        return i
+                except IndexError:
+                    pass
+        return None

+ 405 - 0
src/variables.py

@@ -0,0 +1,405 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Callable, List, Tuple, Union, Optional
+
+
+class FunctionCodes(Enum):
+    # Read
+    READ_STATUS_REGISTER  = 0x02 # Read the switch input status
+    READ_PARAMETER        = 0x03 # Read multiple hold registers
+    READ_MEMORY           = 0x04 # Read input register
+    # Write
+    WRITE_STATUS_REGISTER = 0x05 # Write single register
+    WRITE_MEMORY_SINGLE   = 0x06 # Write single hold register
+    WRITE_MEMORY_RANGE    = 0x10 # Write multiple hold registers
+
+@dataclass
+class Variable():
+    address: int
+    is_32_bit: bool
+    is_signed: bool
+    function_codes: list[int]
+    unit: str
+    multiplier: int
+    name: str
+    friendly_name: str
+    func: Union[Callable[int, str], None]
+    binary_payload: Union[Tuple[str, str], None]
+
+class VariableContainer:
+    def __init__(self, variables: List[Variable]):
+        self._variables = variables
+        self._variable_map = {var.name: var for var in variables}
+
+    def __getitem__(self, key: Union[int, str, slice]) -> Variable:
+        if isinstance(key, int):
+            return self._variables[key]
+        elif isinstance(key, str):
+            return self._variable_map[key]
+        elif isinstance(key, slice):
+            return VariableContainer(self._variables[key])
+        else:
+            raise TypeError("Key must be an integer index, a variable name string, or a slice.")
+
+    def __len__(self):
+        return len(self._variables)
+
+    def __iter__(self):
+        return iter(self._variables)
+    
+    def __add__(self, other):
+        return VariableContainer(self._variables + other._variables)
+
+    def __bool__(self):
+        return len(self._variables) > 0
+
+    def items(self):
+        return self._variable_map.items()
+    
+    def get(self, key: str) -> Optional[Variable]:
+        if isinstance(key, str):
+            return self._variable_map.get(key)
+        else:
+            raise TypeError("Key must be a variable name string.")
+
+def _get_functional_status_registers(function_codes: list[int], offset: int):
+    return [
+        # Controller functional status 1
+        Variable(offset, False, False, function_codes, "", 0, "maximum_system_voltage_level", "Maximum system voltage level",
+            lambda x: ["", "12V", "24V", "36V", "48V"][(x >> 12) & 0xF], None),
+        Variable(offset, False, False, function_codes, "", 0, "minimum_system_voltage_level", "Minimum system voltage level",
+            lambda x: ["", "12V", "24V", "36V", "48V"][(x >>  8) & 0xF], None),
+        Variable(offset, False, False, function_codes, "", 0, "controller_series", "Controller Series",
+            lambda x: ["MT series", "DC series", "SMR series"][(x >>  4) & 0xF], None),
+        Variable(offset, False, False, function_codes, "", 0, "battery_type", "Battery type",
+            lambda x: ["Lithium battery", "Non Lithium battery"][(x >>  0) & 0xF], None),
+
+        # Controller functional status 2
+        Variable(offset + 1, False, False, function_codes, "", 0, "infrared_function_available", "Is infrared function available",
+            lambda x: (x >> 15) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "automatic_power_reduction_available", "Is automatic power reduction setting available(only in 365 mode)",
+            lambda x: (x >> 14) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "charging_at_zero_celsius_available", "Is 0°C prohibit charging setting available",
+            lambda x: (x >> 13) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "grade_of_rated_voltage_available", "Is grade of rated voltage setting available",
+            lambda x: (x >> 12) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "overcharge_recovery_voltage_available", "Is overcharge recovery voltage setting available (only lithium battery)",
+            lambda x: (x >> 11) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "overcharge_protection_available", "Is overcharge protection setting available (only lithium battery)",
+            lambda x: (x >> 10) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "floating_charge_voltage_available", "Is floating charge voltage setting available",
+            lambda x: (x >>  9) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "equilibrium_charge_voltage_available", "Is equilibrium charge voltage setting available",
+            lambda x: (x >>  8) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "strong_charging_voltage_available", "Is strong charging voltage setting available",
+            lambda x: (x >>  7) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "low_voltage_recovery_voltage_available", "Is low voltage recovery setting available",
+            lambda x: (x >>  6) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "low_voltage_protection_voltage_available", "Is low voltage protection setting available",
+            lambda x: (x >>  5) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "battery_type_available", "Is Battery Type setting available",
+            lambda x: (x >>  4) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "backlight_time_available", "Is Backlight Time setting available",
+            lambda x: (x >>  3) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "device_time_available", "Is Device Time setting available",
+            lambda x: (x >>  2) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "device_id_available", "Is Device ID setting available",
+            lambda x: (x >>  1) & 1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, function_codes, "", 0, "device_password_available", "Is Device password setting available",
+            lambda x: (x >>  0) & 1 == 1, ("True", "False")),
+
+        # Controller functional status 3
+        Variable(offset + 2, False, False, function_codes, "", 0, "six_time_frame_mode_available", "Is Six Time Frame Mode available",
+            lambda x: (x >> 7) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "five_time_frame_mode_available", "Is Five Time Frame Mode available",
+            lambda x: (x >> 6) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "timing_control_mode_available", "Is Timing Control available",
+            lambda x: (x >> 5) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "t0t_mode_available", "Is T0T Mode available",
+            lambda x: (x >> 4) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "fixed_duration_mode_available", "Is Fixed Light Up Duration Mode available",
+            lambda x: (x >> 3) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "d2d_mode_available", "Is D2D Mode available",
+            lambda x: (x >> 2) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "24h_mode_available", "Is 24H Mode available",
+            lambda x: (x >> 1) & 1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, function_codes, "", 0, "manual_operation_mode_available", "Is Manual Operation Mode available",
+            lambda x: (x >> 0) & 1 == 1, ("True", "False")),
+
+        # Controller functional status 4 (reserved)
+        # Variable(offset + 3, False, False, function_codes, "", 0, "controller_functional_status_4", "Controller functional status 4", None, None),
+    ]
+
+def _get_device_status_registers(offset: int):
+    return [
+        # Battery status
+        Variable(offset, False, False, [0x04], "", 0, "battery_temperature_protection_status", "Battery temperature protection status",
+            lambda x: ["Normal", "High temperature protection"][(x >> 4) & 0x1], None),
+        Variable(offset, False, False, [0x04], "", 0, "battery_voltage_protection_status", "Battery voltage protection status",
+            lambda x: ["Normal", "Over voltage protection", "Voltage is low", "Low voltage protection"][(x >> 0) & 0xF], None),
+
+        # Charge status
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_disabled", "Is charging manually disabled", 
+            lambda x: (x >> 6) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_is_night", "Is solar panel night", 
+            lambda x: (x >> 5) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_over_temperature", "Is charge over temperature", 
+            lambda x: (x >> 4) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_state", "Solar panel charge status", 
+            lambda x: ["Not charging", "Float charge", "Boost charge", "Equal charge"][(x >> 2) & 0x3], None),
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_state", "Is charge fault", 
+            lambda x: (x >> 1) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_is_charging", "Solar panel is charging", 
+            lambda x: (x >> 0) & 0x1 == 1, ("True", "False")),
+
+        # Discharge status
+        Variable(offset + 2, False, False, [0x04], "", 0, "load_state", "Load status",
+            lambda x: ["Light load", "Moderate load", "Rated load", "Overload"][(x >> 12) & 0x3], None),
+        Variable(offset + 2, False, False, [0x04], "", 0, "output_short_circuit", "Is output short circuit", 
+            lambda x: (x >> 11) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, [0x04], "", 0, "output_hardware_protection", "Is output hardware protection", 
+            lambda x: (x >>  4) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, [0x04], "", 0, "output_open_circuit_protection", "Is output open circuit protection", 
+            lambda x: (x >>  3) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, [0x04], "", 0, "output_over_temperature", "Is output over temperature", 
+            lambda x: (x >>  2) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, [0x04], "", 0, "output_fault", "Is output fault", 
+            lambda x: (x >> 1) & 0x1 == 1, ("True", "False")),
+        Variable(offset + 2, False, False, [0x04], "", 0, "load_is_enabled", "Is load enabled", 
+            lambda x: (x >> 0) & 0x1 == 1, ("True", "False")),
+    ]
+
+real_time_status = VariableContainer([
+    Variable(0x2000, False, False, [0x02], "", 0, "equipment_internal_over_temperature", "Equipment internal over temperature",
+        lambda x: ["Normal", "Over temperature"][x], None),
+    Variable(0x200C, False, False, [0x02], "", 0, "day_or_night", "Day or night",
+        lambda x: ["Day", "Night"][x], None),
+])
+
+status_registers = VariableContainer(
+    _get_functional_status_registers([0x04], 0x3011) + [
+
+    Variable(0x3015, False, False, [0x04], "V", 100, "lvd_min_setting_value", "Low voltage detect min setting value", None, None),
+    Variable(0x3016, False, False, [0x04], "V", 100, "lvd_max_setting_value", "Low voltage detect max setting value", None, None),
+    Variable(0x3017, False, False, [0x04], "V", 100, "lvd_default_setting_value", "Low voltage detect default setting value", None, None),
+    Variable(0x3018, False, False, [0x04], "V", 100, "lvr_min_setting_value", "Low voltage recovery min setting value", None, None),
+    Variable(0x3019, False, False, [0x04], "V", 100, "lvr_max_setting_value", "Low voltage recovery max setting value", None, None),
+    Variable(0x301A, False, False, [0x04], "V", 100, "lvr_default_setting_value", "Low voltage recovery default setting value", None, None),
+    Variable(0x301B, False, False, [0x04], "V", 100, "cvt_min_setting_value", "Charge target voltage min setting value for Li Series controller", None, None),
+    Variable(0x301C, False, False, [0x04], "V", 100, "cvt_max_setting_value", "Charge target voltage max setting value for Li Series controller", None, None),
+    Variable(0x301D, False, False, [0x04], "V", 100, "cvt_default_setting_value", "Charge target voltage default setting value Li Series controller", None, None),
+    Variable(0x301E, False, False, [0x04], "V", 100, "cvr_min_setting_value", "Charge recovery voltage min setting value Li Series controller", None, None),
+    Variable(0x301F, False, False, [0x04], "V", 100, "cvr_max_setting_value", "Charge recovery voltage max setting value Li Series controller", None, None),
+    Variable(0x3020, False, False, [0x04], "V", 100, "cvr_default_setting_value", "Charge recovery voltage default setting value Li Series controller", None, None),
+    Variable(0x3021, False, False, [0x04], "V", 100, "day_night_threshold_voltage_min", "Day/Night threshold voltage min setting value", None, None),
+    Variable(0x3022, False, False, [0x04], "V", 100, "day_night_threshold_voltage_max", "Day/Night threshold voltage max setting value", None, None),
+    Variable(0x3023, False, False, [0x04], "V", 100, "day_night_threshold_voltage_default", "Day/Night threshold voltage default setting value", None, None),
+    Variable(0x3024, False, False, [0x04], "V", 100, "dimming_voltage_min", "Dimming voltage min setting value", None, None),
+    Variable(0x3025, False, False, [0x04], "V", 100, "dimming_voltage_max", "Dimming voltage max setting value", None, None),
+    Variable(0x3026, False, False, [0x04], "V", 100, "dimming_voltage_default", "Dimming voltage default setting value", None, None),
+    Variable(0x3027, False, False, [0x04], "A", 100, "load_current_min", "Load current min setting value", None, None),
+    Variable(0x3028, False, False, [0x04], "A", 100, "load_current_max", "Load current max setting value", None, None),
+    Variable(0x3029, False, False, [0x04], "V", 100, "cvt_cvr_max_dropout_voltage", "Charge target and recovery voltage max allow dropout voltage for Li-series controller", None, None),
+    Variable(0x302A, False, False, [0x04], "V", 100, "cvt_cvr_min_dropout_voltage", "Charge target and recovery voltage min allow dropout voltage for Li-series controller", None, None),
+    Variable(0x302B, False, False, [0x04], "V", 100, "lvd_lvr_min_dropout_voltage", "Low voltage detect and recovery min allow dropout voltage", None, None),
+    Variable(0x302C, False, False, [0x04], "V", 100, "min_allow_dropout_voltage", "CVR and LVD & CVT and LVR Min allow dropout voltage", None, None),
+    Variable(0x3030, False, False, [0x04], "", 1, "equipment_id", "Equipment ID", None, None),
+    Variable(0x3031, False, False, [0x04], "", 1, "run_days", "Number of running days", None, None),
+    Variable(0x3032, False, False, [0x04], "V", 100, "battery_voltage_level", "Current battery voltage level", None, None),
+
+    ] + _get_device_status_registers(0x3033) + [
+
+    Variable(0x3036, False, False, [0x04], "℃", 100, "environment_temperature", "Environment temperature", None, None),
+    Variable(0x3037, False, False, [0x04], "℃", 100, "device_built_in_temperature", "Device built-intemperature", None, None),
+    Variable(0x3038, False, False, [0x04], "", 1, "battery_empty_times", "Battery empty times", None, None),
+    Variable(0x3039, False, False, [0x04], "", 1, "battery_full_times", "Battery full times", None, None),
+    Variable(0x303A, False, False, [0x04], "", 1, "over_voltage_protection_times", "Over-voltage protection times", None, None),
+    Variable(0x303B, False, False, [0x04], "", 1, "over_current_protection_times", "Over-current protection times", None, None),
+    Variable(0x303C, False, False, [0x04], "", 1, "short_circuit_protection_times", "short-circuit protection times", None, None),
+    Variable(0x303D, False, False, [0x04], "", 1, "open_circuit_protection_times", "Open-circuit protection times", None, None),
+    Variable(0x303E, False, False, [0x04], "", 1, "hardware_protection_times", "Hardware protection times", None, None),
+    Variable(0x303F, False, False, [0x04], "", 1, "charge_over_temperature_protection_times", "Charge over-temperature protection times", None, None),
+    Variable(0x3040, False, False, [0x04], "", 1, "discharge_over_temperature_protection_times", "Discharge over-temperature protection time", None, None),
+    Variable(0x3045, False, False, [0x04], "%", 1, "battery_percentage", "Battery remaining capacity", None, None),
+    Variable(0x3046, False, False, [0x04], "V", 100, "battery_voltage", "Battery voltage", None, None),
+    Variable(0x3047, False, True,  [0x04], "A", 100, "battery_current", "Battery current", None, None),
+    Variable(0x3048, True,  True,  [0x04], "W", 100, "battery_power", "Battery power", None, None),
+    Variable(0x304A, False, False, [0x04], "V", 100, "load_voltage", "Load voltage", None, None),
+    Variable(0x304B, False, False, [0x04], "A", 100, "load_current", "Load current", None, None),
+    Variable(0x304C, True,  False, [0x04], "W", 100, "load_power", "Load power", None, None),
+    Variable(0x304E, False, False, [0x04], "V", 100, "solar_panel_voltage", "Solar panel voltage", None, None),
+    Variable(0x304F, False, False, [0x04], "A", 100, "solar_panel_current", "Solar panel current", None, None),
+    Variable(0x3050, True,  False, [0x04], "W", 100, "solar_panel_power", "Solar panel power", None, None),
+    Variable(0x3052, False, False, [0x04], "kWh", 100, "solar_panel_daily_energy", "Daily solar panel energy", None, None),
+    Variable(0x3053, True,  False, [0x04], "kWh", 100, "solar_panel_total_energy", "Total solar panel energy", None, None),
+    Variable(0x3055, True,  False, [0x04], "kWh", 100, "load_daily_energy", "Daily load energy", None, None),
+    Variable(0x3056, True,  False, [0x04], "kWh", 100, "load_total_energy", "Total load energy", None, None),
+    Variable(0x3058, False, False, [0x04], "min", 1, "total_light_time_during_the_day", "Total light time during the day", None, None),
+    Variable(0x309D, False, False, [0x04], "", 1, "run_days", "Number of running days", None, None),
+    Variable(0x30A0, False, False, [0x04], "V", 100, "battery_voltage", "Battery voltage", None, None),
+    Variable(0x30A1, False, True,  [0x04], "A", 100, "battery_current", "Battery current", None, None),
+    Variable(0x30A2, False, False, [0x04], "℃", 100, "environment_temperature", "Environment temperature", None, None),
+
+    ] + _get_device_status_registers(0x30A3) + [
+
+    Variable(0x30A6, False, False, [0x04], "", 1, "battery_empty_times", "Battery empty times", None, None),
+    Variable(0x30A7, False, False, [0x04], "", 1, "battery_full_times", "Battery full times", None, None),
+    Variable(0x30A8, False, False, [0x04], "V", 100, "battery_daily_voltage_maximum", "Highest battery voltage today", None, None),
+    Variable(0x30A9, False, False, [0x04], "V", 100, "battery_daily_voltage_minimum", "Lowest battery voltage today", None, None),
+    Variable(0x3125, False, False, [0x04], "V", 100, "load_voltage", "Load voltage", None, None),
+    Variable(0x3126, False, False, [0x04], "A", 100, "load_current", "Load current", None, None),
+    Variable(0x3127, True,  False, [0x04], "W", 100, "load_power", "Load power", None, None),
+    Variable(0x3129, False, False, [0x04], "kWh", 100, "load_daily_energy", "Daily load energy", None, None),
+    Variable(0x312E, True,  False, [0x04], "kWh", 100, "load_total_energy", "Total load energy", None, None),
+    Variable(0x316C, False, False, [0x04], "", 1, "run_days", "The number of running days", None, None),
+])
+
+rated_parameters = VariableContainer([
+    Variable(0x3000, False, False, [0x04], "V", 100, "solar_panel_rated_voltage", "Solar panel rated voltage", None, None),
+    Variable(0x3001, False, False, [0x04], "A", 100, "solar_panel_rated_current", "Solar panel rated current", None, None),
+    Variable(0x3002, True,  False, [0x04], "W", 100, "solar_panel_rated_power", "Solar panel rated power", None, None),
+    Variable(0x3004, False, False, [0x04], "V", 100, "battery_rated_voltage", "Battery rated voltage", None, None),
+    Variable(0x3005, False, False, [0x04], "A", 100, "battery_rated_current", "Battery rated current", None, None),
+    Variable(0x3006, True,  False, [0x04], "W", 100, "battery_rated_power", "Battery rated power", None, None),
+    Variable(0x3008, False, False, [0x04], "V", 100, "load_rated_voltage", "Load rated voltage", None, None),
+    Variable(0x3009, False, False, [0x04], "A", 100, "load_rated_current", "Load rated current", None, None),
+    Variable(0x300A, True,  False, [0x04], "W", 100, "load_rated_power", "Load rated power", None, None),
+])
+
+read_only_registers = VariableContainer(
+    _get_functional_status_registers([0x03], 0x8FF0) + [
+
+    Variable(0x8FF4, False, False, [0x03], "V", 100, "lvd_min_setting_value", "Low voltage detect min setting value", None, None),
+    Variable(0x8FF5, False, False, [0x03], "V", 100, "lvd_max_setting_value", "Low voltage detect max setting value", None, None),
+    Variable(0x8FF6, False, False, [0x03], "V", 100, "lvd_default_setting_value", "Low voltage detect default setting value", None, None),
+    Variable(0x8FF7, False, False, [0x03], "V", 100, "lvr_min_setting_value", "Low voltage recovery min setting value", None, None),
+    Variable(0x8FF8, False, False, [0x03], "V", 100, "lvr_max_setting_value", "Low voltage recovery max setting value", None, None),
+    Variable(0x8FF9, False, False, [0x03], "V", 100, "lvr_default_setting_value", "Low voltage recovery default setting value", None, None),
+    Variable(0x8FFA, False, False, [0x03], "V", 100, "cvt_min_setting_value", "Charge target voltage min setting value for Li Series controller", None, None),
+    Variable(0x8FFB, False, False, [0x03], "V", 100, "cvt_max_setting_value", "Charge target voltage max setting value Li Series controller", None, None),
+    Variable(0x8FFC, False, False, [0x03], "V", 100, "cvt_default_setting_value", "Charge target voltage default setting value Li Series controller", None, None),
+    Variable(0x8FFD, False, False, [0x03], "V", 100, "cvr_min_setting_value", "Charge recovery voltage min setting value Li Series controller", None, None),
+    Variable(0x8FFE, False, False, [0x03], "V", 100, "cvr_max_setting_value", "Charge recovery voltage max setting value Li Series controller", None, None),
+    Variable(0x8FFF, False, False, [0x03], "V", 100, "cvr_default_setting_value", "Charge recovery voltage default setting value Li Series controller", None, None),
+    Variable(0x9000, False, False, [0x03], "V", 100, "day_night_threshold_voltage_min", "Day/Night threshold voltage min setting value", None, None),
+    Variable(0x9001, False, False, [0x03], "V", 100, "day_night_threshold_voltage_max", "Day/Night threshold voltage max setting value", None, None),
+    Variable(0x9002, False, False, [0x03], "V", 100, "day_night_threshold_voltage_default", "Day/Night threshold voltage default setting value", None, None),
+    Variable(0x9003, False, False, [0x03], "V", 100, "dimming_voltage_min", "Dimming voltage min setting value", None, None),
+    Variable(0x9004, False, False, [0x03], "V", 100, "dimming_voltage_max", "Dimming voltage max setting value", None, None),
+    Variable(0x9005, False, False, [0x03], "V", 100, "dimming_voltage_default", "Dimming voltage default setting value", None, None),
+    Variable(0x9006, False, False, [0x03], "A", 100, "load_current_min", "Load current min setting value", None, None),
+    Variable(0x9007, False, False, [0x03], "A", 100, "load_current_max", "Load current max setting value", None, None),
+
+    Variable(0x9008, False, False, [0x03], "V", 100, "battery_voltage_level", "Current battery voltage level", None, None),
+    Variable(0x9009, False, False, [0x03], "V", 100, "cvt_cvr_max_dropout_voltage", "Charge target and recovery voltage max allow dropout voltage for Li-series controller", None, None),
+    Variable(0x900A, False, False, [0x03], "V", 100, "cvt_cvr_min_dropout_voltage", "Charge target and recovery voltage min allow dropout voltage for Li-series controller", None, None),
+    Variable(0x900B, False, False, [0x03], "V", 100, "lvd_lvr_min_dropout_voltage", "Low voltage detect and recovery min allow dropout voltage", None, None),
+    Variable(0x900C, False, False, [0x03], "V", 100, "min_allow_dropout_voltage", "CVR and LVD & CVT and LVR Min allow dropout voltage", None, None),
+])
+
+device_parameters = VariableContainer([
+    Variable(0x9017, False, False, [0x03, 0x06, 0x10], "ss", 1, "real_time_clock_second", "Real-time clock second", None, None),
+    Variable(0x9018, False, False, [0x03, 0x06, 0x10], "mm", 1, "real_time_clock_minute", "Real-time clock minute", None, None),
+    Variable(0x9019, False, False, [0x03, 0x06, 0x10], "hh", 1, "real_time_clock_hour", "Real-time clock hour", None, None),
+    Variable(0x901A, False, False, [0x03, 0x06, 0x10], "dd", 1, "real_time_clock_day", "Real-time clock day", None, None),
+    Variable(0x901B, False, False, [0x03, 0x06, 0x10], "MM", 1, "real_time_clock_month", "Real-time clock month", None, None),
+    Variable(0x901C, False, False, [0x03, 0x06, 0x10], "yy", 1, "real_time_clock_year", "Real-time clock year (00-99)", None, None),
+    Variable(0x901D, False, False, [0x03, 0x06, 0x10], "baud", 0, "baud_rate", "Baud rate",
+        lambda x: ["4800", "9600", "19200", "57600", "115200"][x & 0xF], None),
+    Variable(0x901E, False, False, [0x03, 0x06, 0x10], "s", 1, "backlight_time", "Backlight time", None, None),
+    Variable(0x901F, False, False, [0x03, 0x06, 0x10], "", 0, "device_password", "Device password",
+        lambda x: str(max((x>>12) & 0xF, 9)) + 
+                    str(max((x>> 8) & 0xF, 9)) + 
+                    str(max((x>> 4) & 0xF, 9)) + 
+                    str(max((x>> 0) & 0xF, 9)), None),
+    Variable(0x9020, False, False, [0x03, 0x06, 0x10], "", 1, "slave_id", "Slave ID", None, None),
+])
+
+battery_and_load_parameters = VariableContainer([
+    Variable(0x9021, False, False, [0x03, 0x06, 0x10], "", 0, "battery_type", "Battery type",
+        lambda x: ["Lithium", "Liquid", "GEL", "AGM"][(x >>  0) & 0xF], None),
+    Variable(0x9022, False, False, [0x03, 0x06, 0x10], "V", 100, "low_voltage_protection_voltage", "Low voltage protection",  None, None),
+    Variable(0x9023, False, False, [0x03, 0x06, 0x10], "V", 100, "low_voltage_recovery_voltage", "Low voltage recovery", None, None),
+    Variable(0x9024, False, False, [0x03, 0x06, 0x10], "V", 100, "boost_voltage", "Boost voltage", None, None),
+    Variable(0x9025, False, False, [0x03, 0x06, 0x10], "V", 100, "equalizing_voltage", "Equalizing voltage", None, None),
+    Variable(0x9026, False, False, [0x03, 0x06, 0x10], "V", 100, "float_voltage", "Float voltage", None, None),
+    Variable(0x9027, False, False, [0x03, 0x06, 0x10], "", 0, "system_rated_voltage_level", "System rated voltage level",
+        lambda x: ["Auto", "12V", "24V", "36V", "48V", "60V", "110V", "120V", "220V", "240V"][x], None),
+    Variable(0x9028, False, False, [0x03, 0x06, 0x10], "V", 100, "charge_target_voltage_for_lithium", "Charge target voltage for lithium", None, None),
+    Variable(0x9029, False, False, [0x03, 0x06, 0x10], "V", 100, "charge_recovery_voltage_for_lithium", "Charge recovery voltage for lithium", None, None),
+    Variable(0x902A, False, False, [0x03, 0x06, 0x10], "", 0, "charging_at_zero_celsius", "0°C charging",
+        lambda x: ["Normal charging", "No charging", "Slow charging"][x & 0xF], None),
+    Variable(0x902B, False, False, [0x03, 0x06, 0x10], "", 0, "mt_series_load_mode", "Load mode for MT series controller",
+        lambda x: (["Always on", "Dusk to dawn"] + 
+                    [f"Night light on time {n} hours" for n in range(2, 10)] + 
+                    ["Manual", "T0T", "Timing switch"])[x], None),
+    Variable(0x902C, False, False, [0x03, 0x06, 0x10], "", 0, "mt_series_manual_control_default", "MT Series manual control mode default setting",
+        lambda x: ["On", "Off"][x], ("On", "Off")),
+    Variable(0x902D, False, False, [0x03, 0x06, 0x10], "min", 1, "mt_series_timing_period_1", "MT Series timing opening period 1",
+        lambda x: ((x >> 8) & 0xFF) * 60 + max(x & 0xFF, 59), None),
+    Variable(0x902E, False, False, [0x03, 0x06, 0x10], "min", 1, "mt_series_timing_period_2", "MT Series timing opening period 2",
+        lambda x: ((x >> 8) & 0xFF) * 60 + max(x & 0xFF, 59), None),
+    Variable(0x902F, False, False, [0x03, 0x06, 0x10], "sec", 1, "timed_start_time_1_seconds", "Timed start time 1-seconds", None, None),
+    Variable(0x9030, False, False, [0x03, 0x06, 0x10], "min", 1, "timed_start_time_1_minutes", "Timed start time 1-minute", None, None),
+    Variable(0x9031, False, False, [0x03, 0x06, 0x10], "hour", 1, "timed_start_time_1_hours", "Timed start time 1-hour", None, None),
+    Variable(0x9032, False, False, [0x03, 0x06, 0x10], "sec", 1, "timed_off_time_1_seconds", "Timed off time 1-seconds", None, None),
+    Variable(0x9033, False, False, [0x03, 0x06, 0x10], "min", 1, "timed_off_time_1_minutes", "Timed off time 1-minute", None, None),
+    Variable(0x9034, False, False, [0x03, 0x06, 0x10], "hour", 1, "timed_off_time_1_hours", "Timed off time 1-hour", None, None),
+    Variable(0x9035, False, False, [0x03, 0x06, 0x10], "sec", 1, "timed_start_time_2_seconds", "Timed start time 2-seconds", None, None),
+    Variable(0x9036, False, False, [0x03, 0x06, 0x10], "min", 1, "timed_start_time_2_minutes", "Timed start time 2-minute", None, None),
+    Variable(0x9037, False, False, [0x03, 0x06, 0x10], "hour", 1, "timed_start_time_2_hours", "Timed start time 2-hour", None, None),
+    Variable(0x9038, False, False, [0x03, 0x06, 0x10], "sec", 1, "timed_off_time_2_seconds", "Timed off time 2-seconds", None, None),
+    Variable(0x9039, False, False, [0x03, 0x06, 0x10], "min", 1, "timed_off_time_2_minutes", "Timed off time 2-minute", None, None),
+    Variable(0x903A, False, False, [0x03, 0x06, 0x10], "hour", 1, "timed_off_time_2_hours", "Timed off time 2-hour", None, None),
+    Variable(0x903B, False, False, [0x03, 0x06, 0x10], "", 0, "time_control_period_selection", "Time control period selection",
+        lambda x: ["1 period", "2 periods"][x], None),
+    Variable(0x903C, False, False, [0x03, 0x06, 0x10], "V", 100, "light_controlled_dark_voltage", "Light controlled dark voltage", None, None),
+    Variable(0x903D, False, False, [0x03, 0x06, 0x10], "min", 1, "day_night_delay_time", "Day/Night delay time", None, None),
+    Variable(0x903E, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_timing_control_time_1_dimming", "DC series timing control time 1 dimming", None, None),
+    Variable(0x903F, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_timing_control_time_2_dimming", "DC series timing control time 2 dimming", None, None),
+    Variable(0x9040, False, False, [0x03, 0x06, 0x10], "min", 1.0/30, "dc_series_time_1", "DC Series time 1", None, None),
+    Variable(0x9041, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_time_1_dimming", "DC Series the time 1 dimming", None, None),
+    Variable(0x9042, False, False, [0x03, 0x06, 0x10], "min", 1.0/30, "dc_series_time_2", "DC Series time 2", None, None),
+    Variable(0x9043, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_time_2_dimming", "DC Series the time 2 dimming", None, None),
+    Variable(0x9044, False, False, [0x03, 0x06, 0x10], "sec", 1.0/30, "dc_series_time_3", "DC Series time 3", None, None),
+    Variable(0x9045, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_time_3_dimming", "DC Series the time 3 dimming", None, None),
+    Variable(0x9046, False, False, [0x03, 0x06, 0x10], "sec", 1.0/30, "dc_series_time_4", "DC Series time 4", None, None),
+    Variable(0x9047, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_time_4_dimming", "DC Series the time 4 dimming", None, None),
+    Variable(0x9048, False, False, [0x03, 0x06, 0x10], "sec", 1.0/30, "dc_series_time_5", "DC Series time 5", None, None),
+    Variable(0x9049, False, False, [0x03, 0x06, 0x10], "%", 0.1, "dc_series_time_5_dimming", "DC Series the time 5 dimming", None, None),
+    Variable(0x904A, False, False, [0x03, 0x06, 0x10], "A", 100, "dc_series_load_current_limit", "DC Series load current limit", None, None),
+    Variable(0x904B, False, False, [0x03, 0x06, 0x10], "", 0, "dc_series_auto_dimming", "DC Series auto dimming",
+        lambda x: ["Auto dimming", "365 mode", "No dimming", "No dimming"][x & 0xF], None),
+    Variable(0x904C, False, False, [0x03, 0x06, 0x10], "V", 100, "dc_series_dimming_voltage", "DC Series dimming voltage", None, None),
+    Variable(0x904D, False, False, [0x03, 0x06, 0x10], "%", 1, "dc_series_dimming_percentage", "DC Series dimming percentage", None, None),
+    Variable(0x904E, False, False, [0x03, 0x06, 0x10], "sec", 0.1, "sensing_delay_off_time", "Sensing delay off time", None, None),
+    Variable(0x904F, False, False, [0x03, 0x06, 0x10], "%", 0.1, "infrared_dimming_when_no_people", "Dimming of Infrared Series controller when no people", None, None),
+    Variable(0x9052, False, False, [0x03, 0x06, 0x10], "", 0, "light_controlled_switch", "Light controlled switch", None, ("On", "Off")),
+    Variable(0x9053, False, False, [0x03, 0x06, 0x10], "V", 100, "light_controlled_daybreak_voltage", "Light-control led daybreak voltage", None, None),
+    Variable(0x9054, False, False, [0x03, 0x06, 0x10], "%", 1, "dimming_percentage", "Dimming percentage for load test", None, None),
+    Variable(0x9069, False, False, [0x03, 0x06, 0x10], "A", 100, "maximum_charging_current_setting", "Maximum charging current setting", None, None),
+    Variable(0x906A, False, False, [0x03, 0x06, 0x10], "℃", 100, "over_temperature_protection", "Over temperature protection", None, None),
+])
+
+switches = VariableContainer([
+    Variable(0x0000, False, False, [0x05], "", 0, "manual_control_switch", "Manual control switch", None, ("On", "Off")),
+    Variable(0x0001, False, False, [0x05], "", 0, "test_key_trigger", "Test key on/off", None, ("On", "Off")),
+    Variable(0x0002, False, False, [0x05], "", 0, "dc_series_timing_control_mode_switch", "DC Series timing control mode switch", None, ("On", "Off")),
+    Variable(0x0003, False, False, [0x05], "", 0, "manual_control_charging_switch", "Manual control charging switch", None, ("On", "Off")),
+
+    Variable(0x0008, False, False, [0x05], "", 0, "restore_system_default_values", "Restore system default values", None, ("Restore", "")),
+    Variable(0x0009, False, False, [0x05], "", 0, "clear_device_statistics", "Clear running days, Power generation or consumption WH and historical minimum/maximum voltage", None, ("Clear", "")),
+    Variable(0x000A, False, False, [0x05], "", 0, "clear_counters", "Clear all protection and fully charged times", None, ("Clear", "")),
+    Variable(0x000B, False, False, [0x05], "", 0, "Clear_charge_discharge_ah", "Clear charge/discharge AH", None, ("Clear", "")),
+    Variable(0x000C, False, False, [0x05], "", 0, "clear_all", "Clear all of the above historical data", None, ("Clear", "")),
+])
+
+variables = (
+    real_time_status +
+    status_registers +
+    rated_parameters +
+    read_only_registers +
+    device_parameters +
+    battery_and_load_parameters +
+    switches
+)

+ 9 - 0
tests/__main__.py

@@ -0,0 +1,9 @@
+import unittest
+
+from .compatibility_test import TestCompat
+from .transaction_test import TestTransaction
+from .variable_test import TestVariables
+from .main_test import TestMain
+
+if __name__ == "__main__":
+    unittest.main()

+ 70 - 0
tests/compatibility_test.py

@@ -0,0 +1,70 @@
+from itertools import groupby
+
+import unittest
+import sys
+sys.path.append("..")
+
+from src.variables import variables
+
+class TestCompat(unittest.TestCase):
+    def test_includes_names(self):
+        names = [
+            "equipment_id",
+            "run_days",
+            "solar_panel_is_charging",
+            "solar_panel_is_night",
+            "solar_panel_charge_state",
+            "load_is_enabled",
+            "load_state",
+            "battery_empty_times",
+            "battery_full_times",
+            "battery_percentage",
+            "battery_voltage",
+            "battery_current",
+            "battery_power",
+            "load_voltage",
+            "load_current",
+            "load_power",
+            "solar_panel_voltage",
+            "solar_panel_current",
+            "solar_panel_power",
+            "solar_panel_daily_energy",
+            "solar_panel_total_energy",
+            "load_daily_energy",
+            "load_total_energy",
+        ]
+        for name in names:
+            self.assertIn(name, [v.name for v in variables], f"variable {name} doesn't exist anymore")
+    def test_excludes_names(self):
+        bad_names = [
+            "battery_full_level",
+            "battery_state_1",
+            "battery_state_2",
+            "solar_panel_state",
+            "temperature_1",
+            "temperature_2",
+        ]
+        for name in bad_names:
+            self.assertNotIn(name, [v.name for v in variables], f"variable {name} still exists")
+
+    def test_units(self):
+        unit_mapping = {
+            "battery_percentage": "%",
+            "battery_voltage": "V",
+            "battery_current": "A",
+            "battery_power": "W",
+            "load_voltage": "V",
+            "load_current": "A",
+            "load_power": "W",
+            "solar_panel_voltage": "V",
+            "solar_panel_current": "A",
+            "solar_panel_power": "W",
+            "solar_panel_daily_energy": "kWh",
+            "solar_panel_total_energy": "kWh",
+            "load_daily_energy": "kWh",
+            "load_total_energy": "kWh",
+        }
+        for name, unit in unit_mapping.items():
+            items = [v for v in variables if name == v.name]
+            self.assertTrue(items, f"variable {name} doesn't exist anymore")
+            self.assertEqual(items[0].unit, unit)

+ 9 - 0
tests/main_test.py

@@ -0,0 +1,9 @@
+import unittest
+import sys
+sys.path.append("..")
+
+class TestMain(unittest.TestCase):
+    def test_main(self):
+        import main
+if __name__ == "__main__":
+    unittest.main()

+ 146 - 0
tests/transaction_test.py

@@ -0,0 +1,146 @@
+import unittest
+import sys
+sys.path.append("..")
+
+from src.variables import variables
+from src.protocol import LumiaxClient, Result
+
+class TestTransaction(unittest.TestCase):
+
+    def setUp(self):
+        self.client = LumiaxClient()
+
+    def test_1(self):
+        device_id = 0x01
+        byte_count = 56
+        count = int(byte_count / 2)
+        start_address = 0x3011
+        end_address = 0x302C
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        send_buf = bytes([0x01, 0x04, 0x30, 0x11, 0x00, 0x1C, 0xAE, 0xC6])
+        self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
+
+        recv_buf = bytes([0x01, 0x04, 0x38, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04,0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00,0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0xB1, 0xB7])
+        self.assertEqual(len(recv_buf) - 5, byte_count)
+
+        results = self.client.parse(start_address, recv_buf)
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertGreaterEqual(len(results), byte_count / 2)
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
+
+    def test_2(self):
+        device_id = 0x01
+        byte_count = 80
+        count = int(byte_count / 2)
+        start_address = 0x3030
+        end_address = start_address + count - 1
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        send_buf = bytes([0x01, 0x04, 0x30, 0x30, 0x00, 0x28, 0xFF, 0x1B])
+        self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
+
+        recv_buf = bytes([0x01, 0x04, 0x50, 0x00, 0x01, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x20, 0x00, 0x01, 0x09, 0xC4, 0x0B, 0x54, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1F, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x70, 0x04])
+        self.assertEqual(len(recv_buf) - 5, byte_count)
+
+        results = self.client.parse(start_address, recv_buf)
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertGreaterEqual(len(results), byte_count / 2)
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(results[-1].is_32_bit, True)
+        self.assertEqual(hex(results[-1].address), hex(end_address-1))
+
+    def test_3(self):
+        device_id = 0x01
+        byte_count = 2
+        count = int(byte_count / 2)
+        start_address = 0X3000
+        end_address = start_address + count - 1
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        send_buf = bytes([0x01, 0x04, 0x30, 0x00, 0x00, 0x01, 0x3E, 0xCA])
+        self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
+
+        recv_buf = bytes([0x01, 0x04, 0x02, 0x17, 0x70, 0xB7, 0x24])
+        self.assertEqual(len(recv_buf) - 5, byte_count)
+
+        results = self.client.parse(start_address, recv_buf)
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertGreaterEqual(len(results), byte_count / 2)
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
+        self.assertEqual(results[0].value, 60.0)
+
+    def test_4(self):
+        device_id = 0x01
+        byte_count = 58
+        count = int(byte_count / 2)
+        start_address = 0X8FF0
+        end_address = start_address + count - 1
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        send_buf = bytes([0x01, 0x03, 0x8F, 0xF0, 0x00, 0x1D, 0xAF, 0x24])
+        self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
+
+        recv_buf = bytes([0x01, 0x03, 0x3A, 0x41, 0x01, 0x13, 0xF7, 0x00, 0x0F, 0x00, 0x00, 0x04, 0x38, 0x04, 0xB0, 0x04, 0x60, 0x04, 0x74, 0x05, 0x00, 0x04, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2C, 0x03, 0x20, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3C, 0x00, 0x00, 0x7B, 0xB4])
+        self.assertEqual(len(recv_buf) - 5, byte_count)
+
+        results = self.client.parse(start_address, recv_buf)
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
+
+    def test_5(self):
+        device_id = 0x01
+        byte_count = 20
+        count = int(byte_count / 2)
+        start_address = 0x9017
+        end_address = start_address + count - 1
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        send_buf = bytes([0x01, 0x03, 0x90, 0x17, 0x00, 0x0A, 0x58, 0xC9])
+        self.assertEqual(send_buf, self.client.get_read_command(device_id, start_address, count))
+
+        recv_buf = bytes([0x01, 0x03, 0x14, 0x00, 0x34, 0x00, 0x24, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x12, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x45, 0x85])
+        self.assertEqual(len(recv_buf) - 5, byte_count)
+
+        results = self.client.parse(start_address, recv_buf)
+        for result in results:
+            self.assertIsNotNone(result.value, f"{result.name} ({hex(result.address)})")
+        self.assertGreaterEqual(len(results), byte_count / 2)
+        self.assertEqual(hex(results[0].address), hex(start_address))
+        self.assertEqual(hex(results[-1].address), hex(end_address))
+
+    def test_6(self):
+        device_id = 0x01
+        byte_count = 20
+        count = int(byte_count / 2)
+        start_address = 0x9021
+        end_address = start_address + count - 1
+
+        self.assertEqual(end_address - start_address + 1, count)
+
+        items = [v for v in variables if v.address >= start_address and v.address <= end_address]
+        values = ["Lithium", 10.6, 11.8, 14.4, 14.7, 13.6, "Auto", 14.4, 14.0, "Normal charging"]
+        results = [Result(**vars(variable), value=value) for variable, value in (zip(items, values))]
+
+        send_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x14, 0x00, 0x00, 0x04, 0x24, 0x04, 0x9C, 0x05, 0xA0, 0x05, 0xBE, 0x05, 0x50, 0x00, 0x00, 0x05, 0xA0, 0x05, 0x78, 0x00, 0x00, 0xCC, 0xE7])
+        start_address, data = self.client.get_write_command(device_id, results)
+        self.assertEqual(send_buf, data)
+
+        recv_buf = bytes([0x01, 0x10, 0x90, 0x21, 0x00, 0x0A, 0x3D, 0x04])
+        self.assertEqual(len(recv_buf) - 4, 4)
+        results = self.client.parse(start_address, recv_buf)
+        self.assertListEqual(list(results), [])
+if __name__ == "__main__":
+    unittest.main()

+ 34 - 0
tests/variable_test.py

@@ -0,0 +1,34 @@
+from itertools import groupby
+
+import unittest
+import sys
+sys.path.append("..")
+
+from src.variables import variables
+
+class TestVariables(unittest.TestCase):
+    def test_multiplier_func_exclusion(self):
+        for variable in variables:
+            if variable.multiplier and variable.func:
+                result = variable.func(0)
+                self.assertTrue(isinstance(result, (int, float)))
+    def test_func_32_bit_exclusion(self):
+        for variable in variables:
+            if variable.is_32_bit:
+                self.assertIsNone(variable.func)
+    def test_common_lengths(self):
+        for key, group in groupby(variables, lambda x: x.address):
+            is_32_bit = next(group).is_32_bit
+            for variable in group:
+                self.assertEqual(variable.is_32_bit, is_32_bit)
+    def test_indexer(self):
+        variable = variables['battery_percentage']
+        self.assertEqual('battery_percentage', variable.name)
+        variable = variables.get('battery_percentage')
+        self.assertEqual('battery_percentage', variable.name)
+        self.assertIsNotNone(variables.items())
+    def test_slice(self):
+        x = variables[4:12]
+        self.assertEqual(8, len(x))
+if __name__ == "__main__":
+    unittest.main()