123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import time
- from datetime import datetime
- import struct
- from snap7.client import Client
- from snap7.exceptions import Snap7Exception
- from snap7.types import Areas
- from structures.plant import *
- from inputs.common import Input
- localtz = datetime.now().astimezone().tzinfo
- class SiemensCPU(Input):
- interval = 0.05
- def __init__(self, address) -> None:
- super().__init__(self.read_handler)
- self.address = address
- self.cpu = Client()
- def start(self):
- try:
- self.cpu.connect(self.address, rack=0, slot=0)
- except Snap7Exception:
- pass
- super().start()
- def read_handler(self):
- timestamp = datetime.now(localtz)
- cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun"
- if self.cpu.get_connected():
- try:
- status = self.cpu.read_area(area=Areas.DB, dbnumber=3, start=0, size=4)
- #inputs = self.cpu.read_area(area=Areas.PE, dbnumber=0, start=32, size=112-32)
- except Snap7Exception as ex:
- if "TCP" in str(ex):
- self.cpu.disconnect()
- return
- else:
- raise ex
- else:
- self.cpu.disconnect()
- self.cpu.connect(self.address, rack=0, slot=0)
- return
- hydraulics_powered = status[0] & 1 > 0
- data = struct.unpack(">BBBB", status)
- #print(''.join(["{:02X}".format(x) for x in inputs]))
- unterteil_faehrt_aus, \
- unterteil_faehrt_ein, \
- auswerfer_1_heben, \
- auswerfer_1_senken, \
- auswerfer_2_heben, \
- auswerfer_2_senken, \
- abdruecker_seitenteil_links_einfahren, \
- abdruecker_seitenteil_links_ausfahren = [data[0] & (1 << i) > 0 for i in range(8)]
- abdruecker_seitenteil_rechts_einfahren, \
- abdruecker_seitenteil_rechts_ausfahren, \
- rolltor_schliessen, \
- rolltor_oeffnen, \
- gaswagen_ausfahren, \
- gaswagen_einfahren, \
- schwenkplatte_abklappen, \
- schwenkplatte_einklappen = [data[1] & (1 << i) > 0 for i in range(8)]
- losteil_1_unterteil_einfahren, \
- losteil_1_unterteil_ausfahren, \
- losteil_2_einfahren, \
- losteil_2_ausfahren, \
- losteil_3_einfahren, \
- losteil_3_ausfahren, \
- losteil_4_einfahren, \
- losteil_4_ausfahren = [data[2] & (1 << i) > 0 for i in range(8)]
- losteil_5_einfahren, \
- losteil_5_ausfahren, \
- seitenteil_links_schliessen, \
- seitenteil_rechts_schliessen, \
- seitenteil_links_und_rechts_oeffnen, \
- sandschleuse_schliessen, \
- sandschleuse_oeffnen, \
- sandmessung_3_wege_ventil_schliessen = [data[3] & (1 << i) > 0 for i in range(8)]
- sandmessung_3_wege_ventil_oeffnen, \
- schuss, \
- hubtisch_senken, \
- hubtisch_heben, \
- oberteil_heben, \
- oberteil_senken = [data[4] & (1 << i) > 0 for i in range(6)]
- self._q.put(PlantState(timestamp, "S7", cpu_state,
- unterteil_faehrt_aus, \
- unterteil_faehrt_ein, \
- auswerfer_1_heben, \
- auswerfer_1_senken, \
- auswerfer_2_heben, \
- auswerfer_2_senken, \
- abdruecker_seitenteil_links_einfahren, \
- abdruecker_seitenteil_links_ausfahren, \
- abdruecker_seitenteil_rechts_einfahren, \
- abdruecker_seitenteil_rechts_ausfahren, \
- rolltor_schliessen, \
- rolltor_oeffnen, \
- gaswagen_ausfahren, \
- gaswagen_einfahren, \
- schwenkplatte_abklappen, \
- schwenkplatte_einklappen, \
- losteil_1_unterteil_einfahren, \
- losteil_1_unterteil_ausfahren, \
- losteil_2_einfahren, \
- losteil_2_ausfahren, \
- losteil_3_einfahren, \
- losteil_3_ausfahren, \
- losteil_4_einfahren, \
- losteil_4_ausfahren, \
- losteil_5_einfahren, \
- losteil_5_ausfahren, \
- seitenteil_links_schliessen, \
- seitenteil_rechts_schliessen, \
- seitenteil_links_und_rechts_oeffnen, \
- sandschleuse_schliessen, \
- sandschleuse_oeffnen, \
- sandmessung_3_wege_ventil_schliessen, \
- sandmessung_3_wege_ventil_oeffnen, \
- schuss, \
- hubtisch_senken, \
- hubtisch_heben, \
- oberteil_heben, \
- oberteil_senken
- ))
- def get_timestamp(self, cpu_time):
- if not self.cpu_start_time:
- self.synchronize()
- cpu_diff = cpu_time - self.cpu_start_time
- date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
- return date
-
|