12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import snap7
- import time
- import struct
- class SiemensCPU(snap7.client.Client):
- cpu_start_time = None
- cpu_last_time = None
- local_start_time = time.time()
- db = 1
- interval = 0.05
- cpu_db_value_count = 100
- def get_ints(self, start, count):
- raw = self.db_read(self.db, start*4, count*4)
- data = struct.unpack(">" + "i" * count, raw)
- return data
- def get_timestamp(self, cpu_time):
- if self.cpu_start_time:
- cpu_diff = cpu_time - self.cpu_start_time
- return self.local_start_time + cpu_diff
- else:
- raise Exception("call synchronize first")
- def synchronize(self, duration = 3):
- start = time.monotonic()
- self.cpu_last_time = None
- intervals = []
- while time.monotonic() - start < duration:
- cpu_time = self.get_ints(0, 1)[0] / 1000
- if cpu_time != self.cpu_last_time:
- self.cpu_start_time = cpu_time
- self.local_start_time = time.time()
- if self.cpu_last_time:
- intervals.append(cpu_time - self.cpu_last_time)
- self.cpu_last_time = cpu_time
- time.sleep(0.05)
- if len(intervals) > 0:
- self.interval = sum(intervals) / len(intervals)
- return self.interval
- def read(self):
- data = self.get_ints(0, self.cpu_db_value_count + 1)
- cpu_time = data[0] / 1000
- if not self.cpu_last_time:
- self.cpu_last_time = cpu_time - self.interval
- if cpu_time == self.cpu_last_time:
- return []
- inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count
- points = []
- for i, val in enumerate(data[1:]):
- val_time = self.get_timestamp(self.cpu_last_time + inc_time * (i+1))
- points.append((val_time, val))
- self.cpu_last_time = cpu_time
- return points
- def read_continous(self):
- while True:
- points = self.read()
- for point in points:
- yield point
-
|