123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import bluetooth
- import time
- class GSV4BT():
- # https://www.manualslib.com/manual/1380505/Me-Gsv-4.html?page=30#manual
- scalings = {
- # (ID, value for 0xFFFF, unit)
- '2mV': (0x01, 2.1 , 'mV/V'), # Measuring range ±2 mV/V (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x01
- '10mV': (0x02, 10.5, 'mV/V'), # Measuring range ±10 mV/V (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x02
- '5V': (0x03, 5.25, 'V' ), # Measuring range 0-5 V (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x03
- '10V': (0x07, 10.5, 'V' ), # Measuring range 0-10 V (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x07
- 'PT1000': (0x04, 1050, '°C' ), # Measuring range PT1000 (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x04
- 'K': (0x06, 1050, '°C' ) # Measuring range K-thermocouple cable (set_gain 0xB2 <p1> <p2>) with p1=ch, p2=0x06
- }
- channelModes = [
- None,
- None,
- None,
- None
- ]
- values = [0] * 4
- frequencies = [
- # (ID, fNom in Hz, fEff in Hz)
- (0xA0, 0.63, 0.625),
- (0xA1, 1.25, 1.250),
- (0xA2, 2.5 , 2.500),
- (0xA3, 3.75, 3.750),
- (0xA4, 6.25, 6.250),
- (0xA5, 7.5 , 7.500),
- (0xA6, 12.5, 12.40),
- (0xA7, 15 , 14.7 ),
- (0xA8, 25 , 24.4 ),
- (0xA9, 125 , 125 ),
- (0xAA, 250 , 250 ),
- (0xAB, 500 , 500 ),
- (0xAC, 937.5, 900 ),
- ]
- def __init__(self, addr):
- self.addr = addr
- self.uuid = None
- self.sock = None
- self.requiresSetup = True
- def printError(self, msg, var = ""):
- print("ERROR: GSV4BT {}:".format(self.addr), msg, var)
- def printWarning(self, msg, var = ""):
- print("WARNING: GSV4BT {}:".format(self.addr), msg, var)
- def connect(self):
- if self.sock:
- return True
-
- service_matches = bluetooth.find_service(address=self.addr, uuid=bluetooth.SERIAL_PORT_CLASS)
- if len(service_matches) == 0:
- self.printError("BT device not found.")
- return False
-
- first_match = service_matches[0]
- self.port = first_match["port"]
- self.name = first_match["name"]
- self.host = first_match["host"]
- self.printWarning("Connecting to \"{}\" on {} port {}".format(self.name, self.host, self.port))
- self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
- try:
- ret = self.sock.connect((self.addr, self.port))
- except bluetooth.btcommon.BluetoothError as e:
- self.printError(e)
- return False
- self.sock.settimeout(0.3)
- return True
-
- def isConnected(self):
- return self.sock and True
-
- def sendRaw(self, data):
- if self.sock:
- try:
- self.sock.send(data)
- except bluetooth.btcommon.BluetoothError as e:
- self.printError("send", e)
- def sendCommand(self, code, *params):
- data = bytes([code] + list(params))
- return self.sendRaw(data)
- def setNormalMode(self):
- return self.sendRaw(bytes([0x26, 0x01, 0x62, 0x65, 0x72, 0x6C, 0x69, 0x6E]))
- def getMode(self):
- self.sendCommand(0x27)
- return self.waitResponse(0x27)
- def getFirmwareVersion(self):
- self.sendCommand(0x2B)
- return self.waitResponse(0x2B)
-
- def getTxStatus(self):
- self.sendCommand(0x29)
- return self.waitResponse(0x29)
- # https://www.manualslib.com/manual/1380505/Me-Gsv-4.html?page=34#manual
- def setGain(self, channel, scalingName = '2mV'):
- self.channelModes[channel] = scalingName
- self.sendCommand(0xB2, channel, self.scalings[scalingName][0])
- return self.waitResponse(0xB2)
- def setFrequency(self, freq = 10):
- for id, fNom, fEff in self.frequencies:
- value = id
- if freq < fEff:
- break
- self.sendCommand(0x12, value)
- return self.waitResponse(0x12)
- def startTransmission(self):
- self.sendCommand(0x24)
- return self.waitResponse(0x24)
- def stopTransmission(self):
- self.sendCommand(0x23)
- return self.waitResponse(0x23)
- def getValue(self):
- self.sendCommand(0x3B)
- lastFrameCarry = None
- def recvRaw(self):
- if not self.sock:
- return
- data = None
- try:
- data = self.sock.recv(1024)
- except bluetooth.btcommon.BluetoothError as e:
- self.printError("receive", e)
- if data == None:
- return
- if self.lastFrameCarry:
- data = self.lastFrameCarry + data
- self.lastFrameCarry = None
- i = 0
- while i < len(data):
- prefix = data[i]
- start = i
- i += 1
- end = data.find(b'\r\n', start)
- if end == -1:
- self.lastFrameCarry = data[start:]
- break
- else:
- i = end + 2
- if prefix == 0xA5:
- # measured values
- if end+2 - start != 11:
- self.printError("invalid values frame: length {}".format(end+2-start), data[start:end+2])
- continue
- self.parseValues(data[start+1:end])
- elif prefix == 0x3B:
- # response
- if end+2 - start < 10:
- self.printError("invalid response frame: length {}".format(end+2-start), data[start:end+2])
- continue
- code = int(data[start+1])
- n = int(data[start+2])
- length = int.from_bytes(data[start+3:start+5], 'big', signed=False)
- no = data[start+5:start+8].decode('ascii')
- if end+2 - start != length + 10:
- self.printError("incorrect response length field: {}".format(length), data[start:end+2])
- continue
- self.parseResponse(code, n, no, data[start+8:end])
- _valuesCb = None
- def setValuesCb(self, cb):
- self._valuesCb = cb
- def parseValues(self, data):
- for i in range(4):
- self.values[i] = int.from_bytes(data[i*2:i*2+2], 'big', signed=False)
- # map range to units
- if self.channelModes[i] == None:
- continue
- _, scale, unit = self.scalings[self.channelModes[i]]
- self.values[i] = (self.values[i] - 32768) / 32768 * scale
- if self._valuesCb:
- self._valuesCb(self.values)
- _respCode = None
- _respData = None
- def _responseCbWait(self, code, data):
- self._respCode = code
- self._respData = data
- _responseCb = None
- def setResponseCb(self, cb):
- self._responseCb = cb
- def waitResponse(self, sentCode):
- userRespCb = self._responseCb
- self._respCode = None
- self._respData = None
- self.setResponseCb(self._responseCbWait)
- self.recvRaw()
- if self._respCode == None:
- return None
- if self._respCode != sentCode:
- self.printError("invalid response code:", hex(self._respCode))
- return None
- self.setResponseCb(userRespCb)
- return self._respData
- def parseResponse(self, code, n, no, data):
- if n > 1:
- self.printWarning("more than 1 response:", n-1)
- if self._responseCb:
- self._responseCb(code, data)
- def getForces(self):
- self.recvRaw()
- return self.values[0:3]
- def close(self):
- self.sock.close()
- if __name__ == "__main__":
- cell = GSV4BT("00:0B:CE:04:F6:66")
- def cb(values):
- print(values)
- while True:
- if cell.isConnected():
- cell.recvRaw()
- else:
- cell.connect()
- cell.setNormalMode()
- cell.setFrequency(20) # Hz
- cell.setGain(0, '2mV')
- cell.setGain(1, '2mV')
- cell.setGain(2, '2mV')
- cell.setGain(3, '5V')
- cell.setValuesCb(cb)
- print(cell.getMode())
- time.sleep(0.3)
|