1234567891011121314151617181920212223242526272829303132333435363738394041 |
- from pylogix import PLC
- from threading import Thread
- import time
- from datetime import datetime
- import struct
- from structures.measurement import CurrentMeasurement
- from inputs.common import Input
- localtz = datetime.now().astimezone().tzinfo
- class AllenBradleyCPU(Input):
-
- def __init__(self):
- super().__init__(self.read_handler)
- self.comm = PLC()
- self.comm.IPAddress = '192.168.10.5'
- self.tag = "STL"
- self.port = 0
- self.E_offset = 12 + self.port * 48
- self.E_connected_offset = self.E_offset + 32
- self.E_error_offset = self.E_offset + 33
- self.A_offset = 6 + self.port * 32
- self.interval = 0.02
- def read_handler(self):
- timestamp = datetime.now(localtz)
- ret = self.comm.Read(F"{self.tag}:I")
- raw = ret.Value
- data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw[self.E_offset:self.E_offset+30])
- for i in range(16):
- self._q.put(CurrentMeasurement(timestamp, "AB", i, data[i]))
- if __name__ == "__main__":
- with AllenBradleyCPU() as cpu:
- cpu.start()
- while True:
- time.sleep(1)
|