common.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from threading import Thread
  2. from queue import Queue
  3. import time
  4. import struct
  5. from structures.measurement import Measurement24v
  6. class Input:
  7. _t = None
  8. _stop = False
  9. _q = Queue()
  10. interval = 1
  11. _read_cb = None
  12. def __init__(self, read_cb) -> None:
  13. self._read_cb = read_cb
  14. def start(self):
  15. if not self._t:
  16. self._stop = False
  17. self._t = Thread(target = self._main)
  18. self._t.setDaemon(True)
  19. self._t.start()
  20. def stop(self):
  21. if self._t:
  22. self._stop = True
  23. self._t.join()
  24. self._t = None
  25. def read(self):
  26. while not self._q.empty():
  27. yield self._q.get()
  28. def _main(self):
  29. while not self._stop:
  30. start_time = time.monotonic()
  31. self._read_cb()
  32. end_time = time.monotonic()
  33. remaining = self.interval + start_time - end_time
  34. if remaining > 0:
  35. time.sleep(remaining)
  36. def queue_ifm_from_bytes(self, timestamp, raw, channels = 16):
  37. data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw)
  38. current = tuple([x / 10 for x in data[0:channels]])
  39. status = tuple([data[17] & (1 << i) for i in range(channels)])
  40. overload = tuple([data[17] & (1 << i) for i in range(channels)])
  41. short_circuit = tuple([data[17] & (1 << i) for i in range(channels)])
  42. limit = tuple([data[17] & (1 << i) for i in range(channels)])
  43. pushbutton = tuple([data[17] & (1 << i) for i in range(channels)])
  44. voltage = data[22] / 100
  45. self._q.put(Measurement24v(timestamp, "AB", current, status, overload, short_circuit, limit, pushbutton, voltage))