bleclient.py 4.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import asyncio
  2. import struct
  3. from bleak import BleakClient, BleakScanner
  4. from bleak.backends.characteristic import BleakGATTCharacteristic
  5. from src.crc import crc16
  6. from src.protocol import LumiaxClient, ResultContainer, Result
  7. class BleClient(LumiaxClient):
  8. DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
  9. NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
  10. WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
  11. buffer = bytearray()
  12. def __init__(self, mac_address: str):
  13. self.client = BleakClient(mac_address)
  14. self.response_queue = asyncio.Queue()
  15. self.lock = asyncio.Lock()
  16. async def __aenter__(self):
  17. await self.client.connect() # Connect to the BLE device
  18. await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler) # Start receiving notifications
  19. return self
  20. async def __aexit__(self, exc_type, exc, tb):
  21. await self.client.stop_notify(self.NOTIFY_UUID) # Stop receiving notifications
  22. await self.client.disconnect() # Disconnect from the BLE device
  23. async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
  24. if characteristic.uuid != self.NOTIFY_UUID:
  25. return
  26. self.buffer += data # Append the received data to the buffer
  27. if not self.is_complete(self.buffer):
  28. return
  29. results = self.parse(self.start_address, self.buffer)
  30. self.response_queue.put_nowait(results)
  31. async def read(self, start_address: int, count: int, repeat = 10, timeout = 5) -> ResultContainer:
  32. async with self.lock:
  33. self.start_address = start_address
  34. command = self.get_read_command(0xFE, start_address, count)
  35. self.response_queue = asyncio.Queue() # Clear the queue
  36. i = 0
  37. # send the command multiple times
  38. while self.response_queue.empty() and i < repeat:
  39. i += 1
  40. await self.client.write_gatt_char(self.WRITE_UUID, command)
  41. try:
  42. # Wait for either a response or timeout
  43. return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
  44. except asyncio.TimeoutError:
  45. pass
  46. return None
  47. async def request_details(self) -> ResultContainer:
  48. return await self.read(0x3030, 43)
  49. async def write(self, results: list[Result], repeat = 10, timeout = 5) -> ResultContainer:
  50. async with self.lock:
  51. start_address, command = self.get_write_command(0xFE, results)
  52. self.response_queue = asyncio.Queue() # Clear the queue
  53. i = 0
  54. # send the command multiple times
  55. while self.response_queue.empty() and i < repeat:
  56. i += 1
  57. await self.client.write_gatt_char(self.WRITE_UUID, command)
  58. try:
  59. # Wait for either a response or timeout
  60. await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
  61. return results
  62. except asyncio.TimeoutError:
  63. pass
  64. return None
  65. async def get_device_name(self):
  66. device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID) # Read the device name from the BLE device
  67. return "".join(map(chr, device_name))
  68. async def list_services(self):
  69. service_text = "[Service] "
  70. charact_text = " [Characteristic] "
  71. descrip_text = " [Descriptor] "
  72. value_text = " Value = "
  73. for service in self.client.services:
  74. print(service_text, service)
  75. for char in service.characteristics:
  76. print(charact_text, char, ",".join(char.properties))
  77. for descriptor in char.descriptors:
  78. try:
  79. print(descrip_text, descriptor)
  80. value = await self.client.read_gatt_descriptor(descriptor.handle)
  81. print(value_text, "0x" + value.hex())
  82. except Exception as e:
  83. pass