snap7_connect.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import snap7
  2. import time
  3. import struct
  4. class SiemensCPU(snap7.client.Client):
  5. cpu_start_time = None
  6. cpu_last_time = None
  7. local_start_time = time.time()
  8. db = 1
  9. interval = 0.05
  10. cpu_db_value_count = 100
  11. def get_ints(self, start, count):
  12. raw = self.db_read(self.db, start*4, count*4)
  13. data = struct.unpack(">" + "i" * count, raw)
  14. return data
  15. def get_timestamp(self, cpu_time):
  16. if self.cpu_start_time:
  17. cpu_diff = cpu_time - self.cpu_start_time
  18. return self.local_start_time + cpu_diff
  19. else:
  20. raise Exception("call synchronize first")
  21. def synchronize(self, duration = 3):
  22. start = time.monotonic()
  23. self.cpu_last_time = None
  24. intervals = []
  25. while time.monotonic() - start < duration:
  26. cpu_time = self.get_ints(0, 1)[0] / 1000
  27. if cpu_time != self.cpu_last_time:
  28. self.cpu_start_time = cpu_time
  29. self.local_start_time = time.time()
  30. if self.cpu_last_time:
  31. intervals.append(cpu_time - self.cpu_last_time)
  32. self.cpu_last_time = cpu_time
  33. time.sleep(0.05)
  34. if len(intervals) > 0:
  35. self.interval = sum(intervals) / len(intervals)
  36. return self.interval
  37. def read(self):
  38. data = self.get_ints(0, self.cpu_db_value_count + 1)
  39. cpu_time = data[0] / 1000
  40. if not self.cpu_last_time:
  41. self.cpu_last_time = cpu_time - self.interval
  42. if cpu_time == self.cpu_last_time:
  43. return []
  44. inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count
  45. points = []
  46. for i, val in enumerate(data[1:]):
  47. val_time = self.get_timestamp(self.cpu_last_time + inc_time * (i+1))
  48. points.append((val_time, val))
  49. self.cpu_last_time = cpu_time
  50. return points
  51. def read_continous(self):
  52. while True:
  53. points = self.read()
  54. for point in points:
  55. yield point