|
@@ -1,14 +1,22 @@
|
|
|
from dataclasses import dataclass
|
|
|
+from enum import Enum
|
|
|
import struct
|
|
|
-from typing import List, Dict, Callable
|
|
|
+from typing import List, Dict, Callable, Any
|
|
|
|
|
|
from .crc import crc16
|
|
|
|
|
|
-@dataclass
|
|
|
-class FunctionCode():
|
|
|
- code: int
|
|
|
- direction: str
|
|
|
- designation: str
|
|
|
+type Value = str|int|float
|
|
|
+
|
|
|
+
|
|
|
+class FunctionCodes(Enum):
|
|
|
+ # Read
|
|
|
+ READ_STATUS_REGISTER = 0x02 # Read the switch input status
|
|
|
+ READ_PARAMETER = 0x03 # Read multiple hold registers
|
|
|
+ READ_MEMORY = 0x04 # Read input register
|
|
|
+ # Write
|
|
|
+ WRITE_STATUS_REGISTER = 0x05 # Write single register
|
|
|
+ WRITE_MEMORY_SINGLE = 0x06 # Write single hold register
|
|
|
+ WRITE_MEMORY_RANGE = 0x10 # Write multiple hold registers
|
|
|
|
|
|
@dataclass
|
|
|
class Variable():
|
|
@@ -22,7 +30,7 @@ class Variable():
|
|
|
friendly_name: str
|
|
|
func: Callable[int, str]
|
|
|
|
|
|
-class ModbusParser:
|
|
|
+class LumiaxClient:
|
|
|
def _get_functional_status_registers(self, function_codes: list[int], offset: int):
|
|
|
return [
|
|
|
# Controller functional status 1
|
|
@@ -37,55 +45,55 @@ class ModbusParser:
|
|
|
|
|
|
# Controller functional status 2
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "infrared_function_available", "Is infrared function available",
|
|
|
- lambda x: (x >> 15) & 1),
|
|
|
+ lambda x: (x >> 15) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "automatic_power_reduction_available", "Is automatic power reduction setting available(only in 365 mode)",
|
|
|
- lambda x: (x >> 14) & 1),
|
|
|
+ lambda x: (x >> 14) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "charging_at_zero_celsius_available", "Is 0°C prohibit charging setting available",
|
|
|
- lambda x: (x >> 13) & 1),
|
|
|
+ lambda x: (x >> 13) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "grade_of_rated_voltage_available", "Is grade of rated voltage setting available",
|
|
|
- lambda x: (x >> 12) & 1),
|
|
|
+ lambda x: (x >> 12) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "overcharge_recovery_voltage_available", "Is overcharge recovery voltage setting available (only lithium battery)",
|
|
|
- lambda x: (x >> 11) & 1),
|
|
|
+ lambda x: (x >> 11) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "overcharge_protection_available", "Is overcharge protection setting available (only lithium battery)",
|
|
|
- lambda x: (x >> 10) & 1),
|
|
|
+ lambda x: (x >> 10) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "floating_charge_voltage_available", "Is floating charge voltage setting available",
|
|
|
- lambda x: (x >> 9) & 1),
|
|
|
+ lambda x: (x >> 9) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "equilibrium_charge_voltage_available", "Is equilibrium charge voltage setting available",
|
|
|
- lambda x: (x >> 8) & 1),
|
|
|
+ lambda x: (x >> 8) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "strong_charging_voltage_available", "Is strong charging voltage setting available",
|
|
|
- lambda x: (x >> 7) & 1),
|
|
|
+ lambda x: (x >> 7) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "low_voltage_recovery_voltage_available", "Is low voltage recovery setting available",
|
|
|
- lambda x: (x >> 6) & 1),
|
|
|
+ lambda x: (x >> 6) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "low_voltage_protection_voltage_available", "Is low voltage protection setting available",
|
|
|
- lambda x: (x >> 5) & 1),
|
|
|
+ lambda x: (x >> 5) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "battery_type_available", "Is Battery Type setting available",
|
|
|
- lambda x: (x >> 4) & 1),
|
|
|
+ lambda x: (x >> 4) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "backlight_time_available", "Is Backlight Time setting available",
|
|
|
- lambda x: (x >> 3) & 1),
|
|
|
+ lambda x: (x >> 3) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "device_time_available", "Is Device Time setting available",
|
|
|
- lambda x: (x >> 2) & 1),
|
|
|
+ lambda x: (x >> 2) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "device_id_available", "Is Device ID setting available",
|
|
|
- lambda x: (x >> 1) & 1),
|
|
|
+ lambda x: (x >> 1) & 1 == 1),
|
|
|
Variable(offset + 1, False, False, function_codes, "", 0, "device_password_available", "Is Device password setting available",
|
|
|
- lambda x: (x >> 0) & 1),
|
|
|
+ lambda x: (x >> 0) & 1 == 1),
|
|
|
|
|
|
# Controller functional status 3
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "six_time_frame_mode_available", "Is Six Time Frame Mode available",
|
|
|
- lambda x: (x >> 7) & 1),
|
|
|
+ lambda x: (x >> 7) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "five_time_frame_mode_available", "Is Five Time Frame Mode available",
|
|
|
- lambda x: (x >> 6) & 1),
|
|
|
+ lambda x: (x >> 6) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "timing_control_mode_available", "Is Timing Control available",
|
|
|
- lambda x: (x >> 5) & 1),
|
|
|
+ lambda x: (x >> 5) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "t0t_mode_available", "Is T0T Mode available",
|
|
|
- lambda x: (x >> 4) & 1),
|
|
|
+ lambda x: (x >> 4) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "fixed_duration_mode_available", "Is Fixed Light Up Duration Mode available",
|
|
|
- lambda x: (x >> 3) & 1),
|
|
|
+ lambda x: (x >> 3) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "d2d_mode_available", "Is D2D Mode available",
|
|
|
- lambda x: (x >> 2) & 1),
|
|
|
+ lambda x: (x >> 2) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "24h_mode_available", "Is 24H Mode available",
|
|
|
- lambda x: (x >> 1) & 1),
|
|
|
+ lambda x: (x >> 1) & 1 == 1),
|
|
|
Variable(offset + 2, False, False, function_codes, "", 0, "manual_operation_mode_available", "Is Manual Operation Mode available",
|
|
|
- lambda x: (x >> 0) & 1),
|
|
|
+ lambda x: (x >> 0) & 1 == 1),
|
|
|
|
|
|
# Controller functional status 4 (reserved)
|
|
|
# Variable(offset + 3, False, False, function_codes, "", 0, "controller_functional_status_4", "Controller functional status 4", None),
|
|
@@ -95,52 +103,42 @@ class ModbusParser:
|
|
|
return [
|
|
|
# Battery status
|
|
|
Variable(offset, False, False, [0x04], "", 0, "battery_temperature_protection_status", "Battery temperature protection status",
|
|
|
- lambda x: ["Normal", "High temperature protection"][(x >> 4) & 0xF]),
|
|
|
+ lambda x: ["Normal", "High temperature protection"][(x >> 4) & 0x1]),
|
|
|
Variable(offset, False, False, [0x04], "", 0, "battery_voltage_protection_status", "Battery voltage protection status",
|
|
|
lambda x: ["Normal", "Over voltage protection", "Voltage is low", "Low voltage protection"][(x >> 0) & 0xF]),
|
|
|
|
|
|
# Charge status
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_disabled", "Is charging manually disabled",
|
|
|
- lambda x: (x >> 6) & 0x1),
|
|
|
+ lambda x: (x >> 6) & 0x1 == 1),
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_is_night", "Is solar panel night",
|
|
|
- lambda x: (x >> 5) & 0x1),
|
|
|
+ lambda x: (x >> 5) & 0x1 == 1),
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_over_temperature", "Is charge over temperature",
|
|
|
- lambda x: (x >> 4) & 0x1),
|
|
|
+ lambda x: (x >> 4) & 0x1 == 1),
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_state", "Solar panel charge status",
|
|
|
lambda x: ["Not charging", "Float charge", "Boost charge", "Equal charge"][(x >> 2) & 0x3]),
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_charge_state", "Is charge fault",
|
|
|
- lambda x: (x >> 1) & 0x1),
|
|
|
+ lambda x: (x >> 1) & 0x1 == 1),
|
|
|
Variable(offset + 1, False, False, [0x04], "", 0, "solar_panel_is_charging", "Solar panel is charging",
|
|
|
- lambda x: (x >> 0) & 0x1),
|
|
|
+ lambda x: (x >> 0) & 0x1 == 1),
|
|
|
|
|
|
# Discharge status
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "load_state", "Load status",
|
|
|
lambda x: ["Light load", "Moderate load", "Rated load", "Overload"][(x >> 12) & 0x3]),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "output_short_circuit", "Is output short circuit",
|
|
|
- lambda x: (x >> 11) & 0x1),
|
|
|
+ lambda x: (x >> 11) & 0x1 == 1),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "output_hardware_protection", "Is output hardware protection",
|
|
|
- lambda x: (x >> 4) & 0x1),
|
|
|
+ lambda x: (x >> 4) & 0x1 == 1),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "output_open_circuit_protection", "Is output open circuit protection",
|
|
|
- lambda x: (x >> 3) & 0x1),
|
|
|
+ lambda x: (x >> 3) & 0x1 == 1),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "output_over_temperature", "Is output over temperature",
|
|
|
- lambda x: (x >> 2) & 0x1),
|
|
|
+ lambda x: (x >> 2) & 0x1 == 1),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "output_fault", "Is output fault",
|
|
|
- lambda x: (x >> 1) & 0x1),
|
|
|
+ lambda x: (x >> 1) & 0x1 == 1),
|
|
|
Variable(offset + 2, False, False, [0x04], "", 0, "load_is_enabled", "Is load enabled",
|
|
|
- lambda x: (x >> 0) & 0x1),
|
|
|
+ lambda x: (x >> 0) & 0x1 == 1),
|
|
|
]
|
|
|
|
|
|
def __init__(self):
|
|
|
- # Available function codes
|
|
|
- self.function_codes = {
|
|
|
- 0x02: FunctionCode(0x02, "R", "Read the switch input status"),
|
|
|
- 0x03: FunctionCode(0x03, "R", "Read multiple hold registers"),
|
|
|
- 0x04: FunctionCode(0x04, "R", "Read input register"),
|
|
|
- 0x05: FunctionCode(0x05, "W", "Write single register"),
|
|
|
- 0x06: FunctionCode(0x06, "W", "Write single hold register"),
|
|
|
- 0x10: FunctionCode(0x10, "W", "Write multiple hold registers"),
|
|
|
- }
|
|
|
-
|
|
|
# List of addresses to variable information
|
|
|
self.variables = [
|
|
|
Variable(0x2000, False, False, [0x02], "", 0, "equipment_internal_over_temperature", "Equipment internal over temperature",
|
|
@@ -235,7 +233,7 @@ class ModbusParser:
|
|
|
Variable(0x3009, False, False, [0x04], "A", 100, "load_rated_current", "Load rated current", None),
|
|
|
Variable(0x300A, True, False, [0x04], "W", 100, "load_rated_power", "Load rated power", None),
|
|
|
|
|
|
- ] + self._get_functional_status_registers([0x04], 0x8FF0) + [
|
|
|
+ ] + self._get_functional_status_registers([0x03], 0x8FF0) + [
|
|
|
|
|
|
Variable(0x8FF4, False, False, [0x03], "V", 100, "lvd_min_setting_value", "Low voltage detect min setting value", None),
|
|
|
Variable(0x8FF5, False, False, [0x03], "V", 100, "lvd_max_setting_value", "Low voltage detect max setting value", None),
|
|
@@ -280,7 +278,7 @@ class ModbusParser:
|
|
|
str(max((x>> 0) & 0xF, 9))),
|
|
|
Variable(0x9020, False, False, [0x03, 0x06, 0x10], "", 0, "slave_id", "Slave ID", None),
|
|
|
Variable(0x9021, False, False, [0x03, 0x06, 0x10], "", 0, "battery_type", "Battery type",
|
|
|
- lambda x: ["Lithium battery", "Liquid", "GEL", "AGM"][(x >> 0) & 0xF]),
|
|
|
+ lambda x: ["Lithium", "Liquid", "GEL", "AGM"][(x >> 0) & 0xF]),
|
|
|
Variable(0x9022, False, False, [0x03, 0x06, 0x10], "V", 100, "low_voltage_protection_voltage", "Low voltage protection", None),
|
|
|
Variable(0x9023, False, False, [0x03, 0x06, 0x10], "V", 100, "low_voltage_recovery_voltage", "Low voltage recovery", None),
|
|
|
Variable(0x9024, False, False, [0x03, 0x06, 0x10], "V", 100, "boost_voltage", "Boost voltage", None),
|
|
@@ -366,7 +364,7 @@ class ModbusParser:
|
|
|
lambda x: ["", "Clear"][x]),
|
|
|
]
|
|
|
|
|
|
- def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int):
|
|
|
+ def bytes_to_value(self, variable: Variable, buffer: bytes, offset: int) -> Value:
|
|
|
if variable.is_32_bit and variable.is_signed:
|
|
|
raw_value = struct.unpack_from(">H", buffer, offset)[0] | struct.unpack_from(">h", buffer, offset + 2)[0] << 16
|
|
|
elif variable.is_32_bit:
|
|
@@ -379,11 +377,15 @@ class ModbusParser:
|
|
|
if variable.multiplier:
|
|
|
value = raw_value / variable.multiplier
|
|
|
elif variable.func:
|
|
|
- value = variable.func(raw_value)
|
|
|
+ try:
|
|
|
+ value = variable.func(raw_value)
|
|
|
+ except IndexError as e:
|
|
|
+ raise Exception(f"unexpected value for {variable.name} ({hex(variable.address)}): '{raw_value}'")
|
|
|
else:
|
|
|
value = raw_value
|
|
|
+ return value
|
|
|
|
|
|
- def value_to_bytes(self, variable: Variable, buffer: bytes, offset: int, value: str) -> int:
|
|
|
+ def value_to_bytes(self, variable: Variable, buffer: bytearray, offset: int, value: Value) -> int:
|
|
|
if variable.multiplier:
|
|
|
raw_value = round(float(value) * variable.multiplier)
|
|
|
elif variable.func:
|
|
@@ -407,52 +409,134 @@ class ModbusParser:
|
|
|
length = 4 if variable.is_32_bit else 2
|
|
|
return offset + length
|
|
|
|
|
|
- def parse(self, start_address: int, buffer: bytes) -> List[(Variable, ...)]:
|
|
|
+ def get_read_command(self, device_id: int, start_address: int, count: int) -> bytes:
|
|
|
+ variables = [v for v in self.variables if v.address >= start_address and v.address < start_address + count]
|
|
|
+ if not variables:
|
|
|
+ raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} contains no variables")
|
|
|
+
|
|
|
+ function_code = variables[0].function_codes[0]
|
|
|
+ if not all(function_code in v.function_codes for v in variables):
|
|
|
+ raise Exception(f"the range {hex(start_address)}-{hex(start_address+count-1)} spans multiple function codes")
|
|
|
+
|
|
|
+ result = bytes([
|
|
|
+ device_id,
|
|
|
+ function_code,
|
|
|
+ start_address >> 8,
|
|
|
+ start_address & 0xFF,
|
|
|
+ count >> 8,
|
|
|
+ count & 0xFF
|
|
|
+ ])
|
|
|
+ return result + crc16(result)
|
|
|
+
|
|
|
+ def get_write_command(self, device_id: int, values: list[(Variable, Any)]) -> bytes:
|
|
|
+ if not values:
|
|
|
+ raise Exception(f"values list is empty")
|
|
|
+ values.sort(key=lambda x: x[0].address)
|
|
|
+ address = values[0][0].address
|
|
|
+ for variable, value in values:
|
|
|
+ if value is None:
|
|
|
+ raise Exception(f"value of {variable.name} ({hex(variable.address)}) is empty")
|
|
|
+ if address < variable.address:
|
|
|
+ raise Exception(f"variables are not continuous at {hex(variable.address)}")
|
|
|
+ address = variable.address + (2 if variable.is_32_bit else 1)
|
|
|
+
|
|
|
+ start_variable = values[0][0]
|
|
|
+ end_variable = values[-1][0]
|
|
|
+ start_address = start_variable.address
|
|
|
+ end_address = end_variable.address + (1 if end_variable.is_32_bit else 0)
|
|
|
+ count = end_address - start_address + 1
|
|
|
+ byte_count = count * 2
|
|
|
+ if byte_count > 255:
|
|
|
+ raise Exception(f"address range is too large")
|
|
|
+
|
|
|
+ if count > 1:
|
|
|
+ function_code = FunctionCodes.WRITE_MEMORY_RANGE
|
|
|
+ header = bytes([
|
|
|
+ device_id,
|
|
|
+ function_code.value,
|
|
|
+ start_address >> 8,
|
|
|
+ start_address & 0xFF,
|
|
|
+ count >> 8,
|
|
|
+ count & 0xFF,
|
|
|
+ byte_count,
|
|
|
+ ])
|
|
|
+ else:
|
|
|
+ if FunctionCodes.WRITE_STATUS_REGISTER.value in values[0][0].function_codes:
|
|
|
+ function_code = FunctionCodes.WRITE_STATUS_REGISTER
|
|
|
+ else:
|
|
|
+ function_code = FunctionCodes.WRITE_MEMORY_SINGLE
|
|
|
+ header = bytes([
|
|
|
+ device_id,
|
|
|
+ function_code.value,
|
|
|
+ start_address >> 8,
|
|
|
+ start_address & 0xFF,
|
|
|
+ ])
|
|
|
+
|
|
|
+ if not all(function_code.value in x[0].function_codes for x in values):
|
|
|
+ raise Exception(f"function code {function_code.name} is not supported for all addresses")
|
|
|
+
|
|
|
+ data = bytearray(byte_count)
|
|
|
+ for variable, value in values:
|
|
|
+ offset = (variable.address - start_address) * 2
|
|
|
+ self.value_to_bytes(variable, data, offset, value)
|
|
|
+
|
|
|
+ result = header + bytes(data)
|
|
|
+ return result + crc16(result)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def parse(self, start_address: int, buffer: bytes) -> list[(Variable, Value)]:
|
|
|
device_id = buffer[0]
|
|
|
- function_code = buffer[1]
|
|
|
- data_length = buffer[2]
|
|
|
- received_crc = buffer[3+data_length:3+data_length+2]
|
|
|
- calculated_crc = crc16(buffer[:3+data_length])
|
|
|
- if received_crc != calculated_crc:
|
|
|
- raise Exception("CRC mismatch")
|
|
|
-
|
|
|
- results = []
|
|
|
- address = start_address
|
|
|
- cursor = 3
|
|
|
- while cursor < data_length + 3:
|
|
|
- n_bytes = 2
|
|
|
- variables = [v for v in self.variables if address == v.address and function_code in v.function_codes]
|
|
|
- for variable in variables:
|
|
|
- value = self.bytes_to_value(variable, buffer, cursor)
|
|
|
- results.append((variable, value))
|
|
|
- n_bytes = 4 if variable.is_32_bit else 2
|
|
|
- cursor += n_bytes
|
|
|
- address += n_bytes
|
|
|
-
|
|
|
- return results
|
|
|
+ function_code = FunctionCodes(buffer[1])
|
|
|
+ if function_code in [FunctionCodes.READ_MEMORY, FunctionCodes.READ_PARAMETER, FunctionCodes.READ_STATUS_REGISTER]:
|
|
|
+ data_length = buffer[2]
|
|
|
+ received_crc = buffer[3+data_length:3+data_length+2]
|
|
|
+ calculated_crc = crc16(buffer[:3+data_length])
|
|
|
+ if received_crc != calculated_crc:
|
|
|
+ raise Exception(f"CRC mismatch ({calculated_crc} != {received_crc})")
|
|
|
+
|
|
|
+ results = []
|
|
|
+ address = start_address
|
|
|
+ cursor = 3
|
|
|
+ while cursor < data_length + 3:
|
|
|
+ variables = [v for v in self.variables if address == v.address and function_code.value in v.function_codes]
|
|
|
+ for variable in variables:
|
|
|
+ value = self.bytes_to_value(variable, buffer, cursor)
|
|
|
+ results.append((variable, value))
|
|
|
+ cursor += 2
|
|
|
+ address += 1
|
|
|
+
|
|
|
+ return results
|
|
|
+ else:
|
|
|
+ address = struct.unpack_from('>H', buffer, 2)[0]
|
|
|
+ if address != start_address:
|
|
|
+ raise Exception(f"Write result address mismatch ({hex(address)} != {hex(start_address)})")
|
|
|
+ received_crc = buffer[6:8]
|
|
|
+ calculated_crc = crc16(buffer[:6])
|
|
|
+ if received_crc != calculated_crc:
|
|
|
+ raise Exception(f"CRC mismatch ({calculated_crc} != {received_crc})")
|
|
|
+ return []
|
|
|
|
|
|
def _find_raw_value_by_brute_force(self, variable: Variable, value):
|
|
|
n_bits = 32 if variable.is_32_bit else 16
|
|
|
if variable.is_signed:
|
|
|
for i in range(0, 2**(n_bits-1) + 1):
|
|
|
- if variable.func(i) == value:
|
|
|
- return i
|
|
|
+ try:
|
|
|
+ if variable.func(i) == value:
|
|
|
+ return i
|
|
|
+ except IndexError:
|
|
|
+ pass
|
|
|
for i in range(0, -2**(n_bits-1) - 2, -1):
|
|
|
- if variable.func(i) == value:
|
|
|
- return i
|
|
|
+ try:
|
|
|
+ if variable.func(i) == value:
|
|
|
+ return i
|
|
|
+ except IndexError:
|
|
|
+ pass
|
|
|
else:
|
|
|
for i in range(0, 2**n_bits + 1):
|
|
|
- if variable.func(i) == value:
|
|
|
- return i
|
|
|
+ try:
|
|
|
+ if variable.func(i) == value:
|
|
|
+ return i
|
|
|
+ except IndexError:
|
|
|
+ pass
|
|
|
return None
|
|
|
-
|
|
|
-# Example usage
|
|
|
-if __name__ == "__main__":
|
|
|
- # Example byte range received for parsing
|
|
|
- received_bytes = bytes.fromhex("41 01 13 F7 00 0F 04 38 04 B0 04 60 04 74 05 00")
|
|
|
- parser = ModbusParser()
|
|
|
- parsed_data = parser.parse(0x3011, received_bytes)
|
|
|
-
|
|
|
- print("Parsed data:")
|
|
|
- for name, details in parsed_data.items():
|
|
|
- print(f"{name}: {details['value']} {details['unit']}")
|