1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from datetime import datetime
- import time
- import requests, json
- import re
- from structures.measurement import CurrentMeasurement
- class Balluff():
- cpu_start_time = None
- cpu_last_time = None
- local_start_time = time.time()
- db = 1
- interval = 0.05
- url = "http://10.0.10.4/ports.jsn"
- port = 0
- def read(self):
- try:
- req = requests.get(self.url)
- except requests.exceptions.ConnectionError:
- return []
- timestamp = datetime.utcnow()
- response = json.loads(req.text)
- if not re.match("^DF210[01]$", response['ports'][self.port]['productId']):
- raise Exception("unsupported device " + response['ports'][self.port]['productId'])
- data = response['ports'][self.port]['processInputs'].split(" ")
- data = [int(x, 16) for x in data]
- status = data[16] << 8 | data[17]
- overload = data[18] << 8 | data[19]
- short = data[20] << 8 | data[21]
- limit = data[22] << 8 | data[23]
- buttons = data[24] << 8 | data[25]
- undervoltage = data[26] & 1
- voltage = data[28] << 8 | data[29]
- points = []
- for i in range(16):
- if status & (1 << i):
- points.append(CurrentMeasurement(timestamp, "IO", i, data[i] / 10))
- return points
- def read_continous(self):
- while True:
- points = self.read()
- for point in points:
- yield point
-
|