bleclient.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import asyncio
  2. import struct
  3. from bleak import BleakClient, BleakScanner
  4. from bleak.backends.characteristic import BleakGATTCharacteristic
  5. from src.crc import crc16
  6. from src.protocol import LumiaxClient, ResultContainer, Result
  7. class BleClient(LumiaxClient):
  8. DEVICE_NAME_UUID = "00002a00-0000-1000-8000-00805f9b34fb"
  9. NOTIFY_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
  10. WRITE_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
  11. buffer = bytearray()
  12. def __init__(self, mac_address: str):
  13. self.client = BleakClient(mac_address)
  14. self.response_queue = asyncio.Queue()
  15. self.lock = asyncio.Lock()
  16. super().__init__()
  17. async def __aenter__(self):
  18. await self.client.connect() # Connect to the BLE device
  19. await self.client.start_notify(self.NOTIFY_UUID, self.notification_handler) # Start receiving notifications
  20. return self
  21. async def __aexit__(self, exc_type, exc, tb):
  22. await self.client.stop_notify(self.NOTIFY_UUID) # Stop receiving notifications
  23. await self.client.disconnect() # Disconnect from the BLE device
  24. def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray):
  25. if characteristic.uuid != self.NOTIFY_UUID:
  26. return
  27. self.buffer += data # Append the received data to the buffer
  28. try:
  29. if not self.is_complete(self.buffer):
  30. return
  31. results = self.parse(self.start_address, self.buffer)
  32. self.response_queue.put_nowait(results)
  33. except Exception as e:
  34. print(f"Response from device: 0x{self.buffer.hex()}")
  35. print(f"Error while parsing response: {e}")
  36. async def read(self, start_address: int, count: int, repeat = 10, timeout = 2) -> ResultContainer:
  37. async with self.lock:
  38. self.start_address = start_address
  39. command = self.get_read_command(0xFE, start_address, count)
  40. self.response_queue = asyncio.Queue() # Clear the queue
  41. i = 0
  42. # send the command multiple times
  43. while i < repeat:
  44. i += 1
  45. self.buffer = bytearray()
  46. await self.client.write_gatt_char(self.WRITE_UUID, command)
  47. try:
  48. # Wait for either a response or timeout
  49. return await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
  50. except asyncio.TimeoutError:
  51. if self.buffer:
  52. print(f"Got partial response: 0x{self.buffer.hex()}")
  53. print(f"Repeating read command...")
  54. return ResultContainer([])
  55. async def request_details(self) -> ResultContainer:
  56. return await self.read(0x3030, 41)
  57. async def request_parameters(self) -> ResultContainer:
  58. return await self.read(0x9021, 12)
  59. async def write(self, results: list[Result], repeat = 10, timeout = 2) -> ResultContainer:
  60. async with self.lock:
  61. start_address, command = self.get_write_command(self.device_id, results)
  62. self.start_address = start_address
  63. self.response_queue = asyncio.Queue() # Clear the queue
  64. i = 0
  65. # send the command multiple times
  66. while i < repeat:
  67. i += 1
  68. self.buffer = bytearray()
  69. await self.client.write_gatt_char(self.WRITE_UUID, command)
  70. print(f"Wrote command 0x{command.hex()}")
  71. try:
  72. # Wait for either a response or timeout
  73. await asyncio.wait_for(self.response_queue.get(), timeout=timeout)
  74. return ResultContainer(results)
  75. except asyncio.TimeoutError:
  76. if self.buffer:
  77. print(f"Got partial response: 0x{self.buffer.hex()}")
  78. print(f"Repeating write command...")
  79. return ResultContainer([])
  80. async def get_device_name(self):
  81. device_name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID) # Read the device name from the BLE device
  82. return "".join(map(chr, device_name))
  83. async def list_services(self):
  84. service_text = "[Service] "
  85. charact_text = " [Characteristic] "
  86. descrip_text = " [Descriptor] "
  87. value_text = " Value = "
  88. for service in self.client.services:
  89. print(service_text, service)
  90. for char in service.characteristics:
  91. print(charact_text, char, ",".join(char.properties))
  92. for descriptor in char.descriptors:
  93. try:
  94. print(descrip_text, descriptor)
  95. value = await self.client.read_gatt_descriptor(descriptor.handle)
  96. print(value_text, "0x" + value.hex())
  97. except Exception as e:
  98. pass