import time from datetime import datetime import struct from snap7.client import Client from snap7.exceptions import Snap7Exception from snap7.types import Areas from structures.plant import * from inputs.common import Input localtz = datetime.now().astimezone().tzinfo class SiemensCPU(Input): interval = 0.05 def __init__(self, address) -> None: super().__init__(self.read_handler) self.address = address self.cpu = Client() def start(self): try: self.cpu.connect(self.address, rack=0, slot=0) except Snap7Exception: pass super().start() def read_handler(self): timestamp = datetime.now(localtz) cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun" if self.cpu.get_connected(): try: status = self.cpu.read_area(area=Areas.DB, dbnumber=3, start=0, size=4) #inputs = self.cpu.read_area(area=Areas.PE, dbnumber=0, start=32, size=112-32) except Snap7Exception as ex: if "TCP" in str(ex): self.cpu.disconnect() return else: raise ex else: self.cpu.disconnect() self.cpu.connect(self.address, rack=0, slot=0) return hydraulics_powered = status[0] & 1 > 0 data = struct.unpack(">BBBB", status) #print(''.join(["{:02X}".format(x) for x in inputs])) cooling_enabled = tuple([data[1] & (1 << i) > 0 for i in range(8)]) heating_enabled = tuple([data[2] & (1 << i) > 0 for i in range(8)]) self._q.put(PlantState(timestamp, "S7", cpu_state, hydraulics_powered, cooling_enabled, heating_enabled)) def get_timestamp(self, cpu_time): if not self.cpu_start_time: self.synchronize() cpu_diff = cpu_time - self.cpu_start_time date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz) return date