123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import asyncio
- import struct
- from bleak import BleakClient, BleakScanner
- from bleak.backends.characteristic import BleakGATTCharacteristic
- from src.crc import crc16
- from src.protocol import LumiaxClient, ResultContainer, Result
- class BleClient(LumiaxClient):
- DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
- NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
- WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
- buffer = bytearray()
- def __init__(self, mac_address: str):
- self.client = BleakClient(mac_address)
- self.response_queue = asyncio.Queue()
- self.lock = asyncio.Lock()
- super().__init__()
- async def __aenter__(self):
- await self.client.connect() # Connect to the BLE device
- await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler) # Start receiving notifications
- return self
- async def __aexit__(self, exc_type, exc, tb):
- await self.client.stop_notify(self.NOTIFY_UUID) # Stop receiving notifications
- await self.client.disconnect() # Disconnect from the BLE device
- def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
- if characteristic.uuid != self.NOTIFY_UUID:
- return
- self.buffer += data # Append the received data to the buffer
- try:
- if not self.is_complete(self.buffer):
- return
- results = self.parse(self.start_address, self.buffer)
- self.response_queue.put_nowait(results)
- except Exception as e:
- print(f"Response from device: 0x{self.buffer.hex()}")
- print(f"Error while parsing response: {e}")
- async def read(self, start_address: int, count: int, repeat = 10, timeout = 2) -> ResultContainer:
- async with self.lock:
- self.start_address = start_address
- command = self.get_read_command(0xFE, start_address, count)
- self.response_queue = asyncio.Queue() # Clear the queue
- i = 0
- # send the command multiple times
- while i < repeat:
- i += 1
- self.buffer = bytearray()
- await self.client.write_gatt_char(self.WRITE_UUID, command)
- try:
- # Wait for either a response or timeout
- return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
- except asyncio.TimeoutError:
- if self.buffer:
- print(f"Got partial response: 0x{self.buffer.hex()}")
- print(f"Repeating read command...")
- return ResultContainer([])
- async def request_details(self) -> ResultContainer:
- return await self.read(0x3030, 41)
- async def request_parameters(self) -> ResultContainer:
- return await self.read(0x9021, 12)
-
- async def write(self, results: list[Result], repeat = 10, timeout = 2) -> ResultContainer:
- async with self.lock:
- start_address, command = self.get_write_command(self.device_id, results)
- self.start_address = start_address
- self.response_queue = asyncio.Queue() # Clear the queue
- i = 0
- # send the command multiple times
- while i < repeat:
- i += 1
- self.buffer = bytearray()
- await self.client.write_gatt_char(self.WRITE_UUID, command)
- print(f"Wrote command 0x{command.hex()}")
- try:
- # Wait for either a response or timeout
- await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
- return ResultContainer(results)
- except asyncio.TimeoutError:
- if self.buffer:
- print(f"Got partial response: 0x{self.buffer.hex()}")
- print(f"Repeating write command...")
- return ResultContainer([])
- async def get_device_name(self):
- device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID) # Read the device name from the BLE device
- return "".join(map(chr, device_name))
- async def list_services(self):
- service_text = "[Service] "
- charact_text = " [Characteristic] "
- descrip_text = " [Descriptor] "
- value_text = " Value = "
- for service in self.client.services:
- print(service_text, service)
- for char in service.characteristics:
- print(charact_text, char, ",".join(char.properties))
- for descriptor in char.descriptors:
- try:
- print(descrip_text, descriptor)
- value = await self.client.read_gatt_descriptor(descriptor.handle)
- print(value_text, "0x" + value.hex())
- except Exception as e:
- pass
|