from snap7.client import Client import time from datetime import datetime import struct from structures.measurement import Measurement24v from inputs.common import Input localtz = datetime.now().astimezone().tzinfo class SiemensCPU(Input): cpu_start_time = None cpu_last_time = None local_start_time = time.time() db = 1 interval = 0.05 cpu_db_value_count = 50 def __init__(self) -> None: super().__init__(self.read_handler) self.cpu = Client() def start(self): self.cpu.connect("192.168.10.6", 0, 0) super().start() def read_handler(self): raw = self.cpu.db_read(self.db, 0, (self.cpu_db_value_count + 1)*4) data = struct.unpack(">" + "i" * (self.cpu_db_value_count + 1), raw) cpu_time = data[0] / 1000 if not self.cpu_last_time: self.cpu_last_time = cpu_time - self.interval if cpu_time == self.cpu_last_time: return inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count for i, val in enumerate(data[1:]): timestamp = self.get_timestamp(self.cpu_last_time + inc_time * (i+1)) self._q.put(Measurement(timestamp, "S7", "24v_current", 0, val / 10)) self.cpu_last_time = cpu_time def get_timestamp(self, cpu_time): if not self.cpu_start_time: self.synchronize() cpu_diff = cpu_time - self.cpu_start_time date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz) return date def synchronize(self, duration = 3): start = time.monotonic() self.cpu_last_time = None intervals = [] while time.monotonic() - start < duration: cpu_time = self.get_ints(0, 1)[0] / 1000 if cpu_time != self.cpu_last_time: self.cpu_start_time = cpu_time self.local_start_time = time.time() if self.cpu_last_time: intervals.append(cpu_time - self.cpu_last_time) self.cpu_last_time = cpu_time time.sleep(0.05) if len(intervals) > 0: self.interval = sum(intervals) / len(intervals) return self.interval