from threading import Thread from queue import Queue import time import struct from structures.measurement import Measurement24v class Input: _t = None _stop = False _q = Queue() interval = 1 _read_cb = None def __init__(self, read_cb) -> None: self._read_cb = read_cb def start(self): if not self._t: self._stop = False self._t = Thread(target = self._main) self._t.setDaemon(True) self._t.start() def stop(self): if self._t: self._stop = True self._t.join() self._t = None def read(self): while not self._q.empty(): yield self._q.get() def _main(self): while not self._stop: start_time = time.monotonic() self._read_cb() end_time = time.monotonic() remaining = self.interval + start_time - end_time if remaining > 0: time.sleep(remaining) def queue_ifm_from_bytes(self, timestamp, raw, channels = 16): data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw) current = tuple([x / 10 for x in data[0:channels]]) status = tuple([data[17] & (1 << i) for i in range(channels)]) overload = tuple([data[17] & (1 << i) for i in range(channels)]) short_circuit = tuple([data[17] & (1 << i) for i in range(channels)]) limit = tuple([data[17] & (1 << i) for i in range(channels)]) pushbutton = tuple([data[17] & (1 << i) for i in range(channels)]) voltage = data[22] / 100 self._q.put(Measurement24v(timestamp, "AB", current, status, overload, short_circuit, limit, pushbutton, voltage))