123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- import time
- from datetime import datetime
- import struct
- from snap7.client import Client
- from snap7.exceptions import Snap7Exception
- from snap7.types import Areas
- from structures.plant import *
- from inputs.common import Input
- localtz = datetime.now().astimezone().tzinfo
- class SiemensCPU(Input):
- interval = 0.05
- def __init__(self, address) -> None:
- super().__init__(self.read_handler)
- self.address = address
- self.cpu = Client()
- def start(self):
- try:
- self.cpu.connect(self.address, rack=0, slot=0)
- except Snap7Exception:
- pass
- super().start()
- def read_handler(self):
- timestamp = datetime.now(localtz)
- cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun"
- if self.cpu.get_connected():
- try:
- status = self.cpu.read_area(area=Areas.DB, dbnumber=3, start=0, size=4)
- #inputs = self.cpu.read_area(area=Areas.PE, dbnumber=0, start=32, size=112-32)
- except Snap7Exception as ex:
- if "TCP" in str(ex):
- self.cpu.disconnect()
- return
- else:
- raise ex
- else:
- self.cpu.disconnect()
- self.cpu.connect(self.address, rack=0, slot=0)
- return
- hydraulics_powered = status[0] & 1 > 0
- data = struct.unpack(">BBBB", status)
- #print(''.join(["{:02X}".format(x) for x in inputs]))
- cooling_enabled = tuple([data[1] & (1 << i) > 0 for i in range(8)])
- heating_enabled = tuple([data[2] & (1 << i) > 0 for i in range(8)])
- self._q.put(PlantState(timestamp, "S7", cpu_state, hydraulics_powered, cooling_enabled, heating_enabled))
- def get_timestamp(self, cpu_time):
- if not self.cpu_start_time:
- self.synchronize()
- cpu_diff = cpu_time - self.cpu_start_time
- date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
- return date
-
|