allen_bradley_connect.py 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from pylogix import PLC
  2. from threading import Thread
  3. import time
  4. from datetime import datetime
  5. import struct
  6. from structures.measurement import CurrentMeasurement
  7. from inputs.common import Input
  8. localtz = datetime.now().astimezone().tzinfo
  9. class AllenBradleyCPU(Input):
  10. def __init__(self):
  11. super().__init__(self.read_handler)
  12. self.comm = PLC()
  13. self.comm.IPAddress = '192.168.10.5'
  14. self.tag = "STL"
  15. self.port = 0
  16. self.E_offset = 12 + self.port * 48
  17. self.E_connected_offset = self.E_offset + 32
  18. self.E_error_offset = self.E_offset + 33
  19. self.A_offset = 6 + self.port * 32
  20. self.interval = 0.02
  21. def read_handler(self):
  22. timestamp = datetime.now(localtz)
  23. ret = self.comm.Read(F"{self.tag}:I")
  24. raw = ret.Value
  25. data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw[self.E_offset:self.E_offset+30])
  26. for i in range(16):
  27. self._q.put(CurrentMeasurement(timestamp, "AB", i, data[i]))
  28. if __name__ == "__main__":
  29. with AllenBradleyCPU() as cpu:
  30. cpu.start()
  31. while True:
  32. time.sleep(1)