balluff_html.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from datetime import datetime
  2. import time
  3. import requests, json
  4. import re
  5. from structures.measurement import CurrentMeasurement
  6. class Balluff():
  7. cpu_start_time = None
  8. cpu_last_time = None
  9. local_start_time = time.time()
  10. db = 1
  11. interval = 0.05
  12. url = "http://10.0.10.4/ports.jsn"
  13. port = 0
  14. def read(self):
  15. try:
  16. req = requests.get(self.url)
  17. except requests.exceptions.ConnectionError:
  18. return []
  19. timestamp = datetime.utcnow()
  20. response = json.loads(req.text)
  21. if not re.match("^DF210[01]$", response['ports'][self.port]['productId']):
  22. raise Exception("unsupported device " + response['ports'][self.port]['productId'])
  23. data = response['ports'][self.port]['processInputs'].split(" ")
  24. data = [int(x, 16) for x in data]
  25. status = data[16] << 8 | data[17]
  26. overload = data[18] << 8 | data[19]
  27. short = data[20] << 8 | data[21]
  28. limit = data[22] << 8 | data[23]
  29. buttons = data[24] << 8 | data[25]
  30. undervoltage = data[26] & 1
  31. voltage = data[28] << 8 | data[29]
  32. points = []
  33. for i in range(16):
  34. if status & (1 << i):
  35. points.append(CurrentMeasurement(timestamp, "IO", i, data[i] / 10))
  36. return points
  37. def read_continous(self):
  38. while True:
  39. points = self.read()
  40. for point in points:
  41. yield point