snap7_connect.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from snap7.client import Client
  2. import time
  3. from datetime import datetime
  4. import struct
  5. from structures.plant import *
  6. from inputs.common import Input
  7. localtz = datetime.now().astimezone().tzinfo
  8. class SiemensCPU(Input):
  9. interval = 0.05
  10. def __init__(self, address) -> None:
  11. super().__init__(self.read_handler)
  12. self.address = address
  13. self.cpu = Client()
  14. def start(self):
  15. self.cpu.connect(self.address, rack=0, slot=0)
  16. self.cpu.get_connected()
  17. super().start()
  18. def read_handler(self):
  19. timestamp = datetime.now(localtz)
  20. cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun"
  21. status = self.cpu.db_read(db_number=3, start=0, size=4)
  22. hydraulics_powered = status[0] & 1 > 0
  23. data = struct.unpack(">BBBB", status)
  24. cooling_enabled = tuple([data[1] & (1 << i) > 0 for i in range(8)])
  25. heating_enabled = tuple([data[2] & (1 << i) > 0 for i in range(8)])
  26. self._q.put(PlantState(timestamp, "S7", cpu_state, hydraulics_powered, cooling_enabled, heating_enabled))
  27. def get_timestamp(self, cpu_time):
  28. if not self.cpu_start_time:
  29. self.synchronize()
  30. cpu_diff = cpu_time - self.cpu_start_time
  31. date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
  32. return date