import time from datetime import datetime import struct from snap7.client import Client from snap7.exceptions import Snap7Exception from snap7.types import Areas from structures.plant import * from inputs.common import Input localtz = datetime.now().astimezone().tzinfo class SiemensCPU(Input): interval = 0.05 def __init__(self, address) -> None: super().__init__(self.read_handler) self.address = address self.cpu = Client() def start(self): try: self.cpu.connect(self.address, rack=0, slot=0) except Snap7Exception: pass super().start() def read_handler(self): timestamp = datetime.now(localtz) cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun" if self.cpu.get_connected(): try: status = self.cpu.read_area(area=Areas.DB, dbnumber=3, start=0, size=5) #inputs = self.cpu.read_area(area=Areas.PE, dbnumber=0, start=32, size=112-32) except Snap7Exception as ex: if "TCP" in str(ex): self.cpu.disconnect() return else: raise ex else: self.cpu.disconnect() self.cpu.connect(self.address, rack=0, slot=0) return hydraulics_powered = status[0] & 1 > 0 data = struct.unpack(">BBBBB", status) #print(''.join(["{:02X}".format(x) for x in inputs])) unterteil_faehrt_aus, \ unterteil_faehrt_ein, \ auswerfer_1_heben, \ auswerfer_1_senken, \ auswerfer_2_heben, \ auswerfer_2_senken, \ abdruecker_seitenteil_links_einfahren, \ abdruecker_seitenteil_links_ausfahren = [data[0] & (1 << i) > 0 for i in range(8)] abdruecker_seitenteil_rechts_einfahren, \ abdruecker_seitenteil_rechts_ausfahren, \ rolltor_schliessen, \ rolltor_oeffnen, \ gaswagen_ausfahren, \ gaswagen_einfahren, \ schwenkplatte_abklappen, \ schwenkplatte_einklappen = [data[1] & (1 << i) > 0 for i in range(8)] losteil_1_unterteil_einfahren, \ losteil_1_unterteil_ausfahren, \ losteil_2_einfahren, \ losteil_2_ausfahren, \ losteil_3_einfahren, \ losteil_3_ausfahren, \ losteil_4_einfahren, \ losteil_4_ausfahren = [data[2] & (1 << i) > 0 for i in range(8)] losteil_5_einfahren, \ losteil_5_ausfahren, \ seitenteil_links_schliessen, \ seitenteil_rechts_schliessen, \ seitenteil_links_und_rechts_oeffnen, \ sandschleuse_schliessen, \ sandschleuse_oeffnen, \ sandmessung_3_wege_ventil_schliessen = [data[3] & (1 << i) > 0 for i in range(8)] sandmessung_3_wege_ventil_oeffnen, \ schuss, \ hubtisch_senken, \ hubtisch_heben, \ oberteil_heben, \ oberteil_senken = [data[4] & (1 << i) > 0 for i in range(6)] self._q.put(PlantState(timestamp, "S7", cpu_state, unterteil_faehrt_aus, \ unterteil_faehrt_ein, \ auswerfer_1_heben, \ auswerfer_1_senken, \ auswerfer_2_heben, \ auswerfer_2_senken, \ abdruecker_seitenteil_links_einfahren, \ abdruecker_seitenteil_links_ausfahren, \ abdruecker_seitenteil_rechts_einfahren, \ abdruecker_seitenteil_rechts_ausfahren, \ rolltor_schliessen, \ rolltor_oeffnen, \ gaswagen_ausfahren, \ gaswagen_einfahren, \ schwenkplatte_abklappen, \ schwenkplatte_einklappen, \ losteil_1_unterteil_einfahren, \ losteil_1_unterteil_ausfahren, \ losteil_2_einfahren, \ losteil_2_ausfahren, \ losteil_3_einfahren, \ losteil_3_ausfahren, \ losteil_4_einfahren, \ losteil_4_ausfahren, \ losteil_5_einfahren, \ losteil_5_ausfahren, \ seitenteil_links_schliessen, \ seitenteil_rechts_schliessen, \ seitenteil_links_und_rechts_oeffnen, \ sandschleuse_schliessen, \ sandschleuse_oeffnen, \ sandmessung_3_wege_ventil_schliessen, \ sandmessung_3_wege_ventil_oeffnen, \ schuss, \ hubtisch_senken, \ hubtisch_heben, \ oberteil_heben, \ oberteil_senken )) def get_timestamp(self, cpu_time): if not self.cpu_start_time: self.synchronize() cpu_diff = cpu_time - self.cpu_start_time date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz) return date