snap7_connect.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import snap7
  2. import time
  3. from datetime import datetime
  4. import struct
  5. from structures.measurement import Measurement
  6. localtz = datetime.now().astimezone().tzinfo
  7. class SiemensCPU(snap7.client.Client):
  8. cpu_start_time = None
  9. cpu_last_time = None
  10. local_start_time = time.time()
  11. db = 1
  12. interval = 0.05
  13. cpu_db_value_count = 50
  14. def get_ints(self, start, count):
  15. raw = self.db_read(self.db, start*4, count*4)
  16. data = struct.unpack(">" + "i" * count, raw)
  17. return data
  18. def get_timestamp(self, cpu_time):
  19. if not self.cpu_start_time:
  20. self.synchronize()
  21. cpu_diff = cpu_time - self.cpu_start_time
  22. date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
  23. return date
  24. def synchronize(self, duration = 3):
  25. start = time.monotonic()
  26. self.cpu_last_time = None
  27. intervals = []
  28. while time.monotonic() - start < duration:
  29. cpu_time = self.get_ints(0, 1)[0] / 1000
  30. if cpu_time != self.cpu_last_time:
  31. self.cpu_start_time = cpu_time
  32. self.local_start_time = time.time()
  33. if self.cpu_last_time:
  34. intervals.append(cpu_time - self.cpu_last_time)
  35. self.cpu_last_time = cpu_time
  36. time.sleep(0.05)
  37. if len(intervals) > 0:
  38. self.interval = sum(intervals) / len(intervals)
  39. return self.interval
  40. def read(self):
  41. data = self.get_ints(0, self.cpu_db_value_count + 1)
  42. cpu_time = data[0] / 1000
  43. if not self.cpu_last_time:
  44. self.cpu_last_time = cpu_time - self.interval
  45. if cpu_time == self.cpu_last_time:
  46. return []
  47. inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count
  48. points = []
  49. for i, val in enumerate(data[1:]):
  50. timestamp = self.get_timestamp(self.cpu_last_time + inc_time * (i+1))
  51. points.append(Measurement(timestamp, "siemens_cpu", 0, val / 10))
  52. self.cpu_last_time = cpu_time
  53. return points
  54. def read_continous(self):
  55. while True:
  56. points = self.read()
  57. for point in points:
  58. yield point