123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import snap7
- import time
- from datetime import datetime
- import struct
- from structures.measurement import Measurement
- localtz = datetime.now().astimezone().tzinfo
- class SiemensCPU(snap7.client.Client):
- cpu_start_time = None
- cpu_last_time = None
- local_start_time = time.time()
- db = 1
- interval = 0.05
- cpu_db_value_count = 50
- def get_ints(self, start, count):
- raw = self.db_read(self.db, start*4, count*4)
- data = struct.unpack(">" + "i" * count, raw)
- return data
- def get_timestamp(self, cpu_time):
- if not self.cpu_start_time:
- self.synchronize()
- cpu_diff = cpu_time - self.cpu_start_time
- date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
- return date
- def synchronize(self, duration = 3):
- start = time.monotonic()
- self.cpu_last_time = None
- intervals = []
- while time.monotonic() - start < duration:
- cpu_time = self.get_ints(0, 1)[0] / 1000
- if cpu_time != self.cpu_last_time:
- self.cpu_start_time = cpu_time
- self.local_start_time = time.time()
- if self.cpu_last_time:
- intervals.append(cpu_time - self.cpu_last_time)
- self.cpu_last_time = cpu_time
- time.sleep(0.05)
- if len(intervals) > 0:
- self.interval = sum(intervals) / len(intervals)
- return self.interval
- def read(self):
- data = self.get_ints(0, self.cpu_db_value_count + 1)
- cpu_time = data[0] / 1000
- if not self.cpu_last_time:
- self.cpu_last_time = cpu_time - self.interval
- if cpu_time == self.cpu_last_time:
- return []
- inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count
- points = []
- for i, val in enumerate(data[1:]):
- timestamp = self.get_timestamp(self.cpu_last_time + inc_time * (i+1))
- points.append(Measurement(timestamp, "siemens_cpu", 0, val / 10))
- self.cpu_last_time = cpu_time
- return points
- def read_continous(self):
- while True:
- points = self.read()
- for point in points:
- yield point
-
|