from pylogix import PLC from threading import Thread import time from datetime import datetime import struct from structures.measurement import CurrentMeasurement from inputs.common import Input localtz = datetime.now().astimezone().tzinfo class AllenBradleyCPU(Input): def __init__(self): super().__init__(self.read_handler) self.comm = PLC() self.comm.IPAddress = '192.168.10.5' self.tag = "STL" self.port = 0 self.E_offset = 12 + self.port * 48 self.E_connected_offset = self.E_offset + 32 self.E_error_offset = self.E_offset + 33 self.A_offset = 6 + self.port * 32 self.interval = 0.02 def read_handler(self): timestamp = datetime.now(localtz) ret = self.comm.Read(F"{self.tag}:I") raw = ret.Value data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw[self.E_offset:self.E_offset+30]) for i in range(16): self._q.put(CurrentMeasurement(timestamp, "AB", i, data[i])) if __name__ == "__main__": with AllenBradleyCPU() as cpu: cpu.start() while True: time.sleep(1)