123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- #!/usr/bin/env python3
- import argparse
- import asyncio
- import signal
- import traceback
- import aiomqtt
- from bleak import BleakScanner
- from bleak.exc import BleakError, BleakDeviceNotFoundError
- from src.homeassistant import MqttSensor
- from src.bleclient import BleClient, Result
- from src.variables import variables, VariableContainer, battery_and_load_parameters, switches
- request_interval = 20 # In seconds
- reconnect_interval = 5 # In seconds
- async def request_and_publish_details(sensor: MqttSensor, mppt: BleClient) -> None:
- details = await mppt.request_details()
- if details:
- print(f"Battery: {details['battery_percentage'].value}% ({details['battery_voltage'].value}V)")
- await sensor.publish(details)
- else:
- print("No values recieved")
- async def subscribe_and_watch(sensor: MqttSensor, mppt: BleClient):
- parameters = battery_and_load_parameters[:12] + switches
- await sensor.subscribe(parameters)
- await sensor.store_config(switches)
- while True:
- command = await sensor.get_command()
- print(f"Received command to set {command.name} to '{command.value}'")
- results = await mppt.write([command])
- await sensor.publish(results)
- async def run_mppt(sensor: MqttSensor, address: str):
- task = None
- loop = asyncio.get_event_loop()
- try:
- async with BleClient(address) as mppt:
- task = loop.create_task(subscribe_and_watch(sensor, mppt))
- parameters = await mppt.request_parameters()
- await sensor.publish(parameters)
- while True:
- await request_and_publish_details(sensor, mppt)
- await asyncio.sleep(request_interval)
- if not task.cancelled() and task.exception:
- break
- except EOFError:
- pass
- except asyncio.TimeoutError:
- print("BLE communication timed out")
- return
- except BleakDeviceNotFoundError:
- print(f"BLE device with address {address} was not found")
- return
- except BleakError as e:
- print(f"BLE error occurred: {e}")
- return
- finally:
- if task:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- print("BLE device disconnected")
- async def run_mqtt(address, host, port, username, password):
- while True:
- try:
- async with MqttSensor(hostname=host, port=port, username=username, password=password) as sensor:
- print(f"Connected to MQTT broker at {host}:{port}")
- while True:
- await run_mppt(sensor, address)
- await asyncio.sleep(reconnect_interval)
- except aiomqtt.MqttError as error:
- print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
- except asyncio.CancelledError:
- raise # Re-raise the CancelledError to stop the task
- except Exception as e:
- print(traceback.format_exc())
- await asyncio.sleep(reconnect_interval)
- async def main(*args):
- try:
- loop = asyncio.get_running_loop()
- task = loop.create_task(run_mqtt(*args))
- # Setup signal handler to cancel the task on termination
- for signame in {'SIGINT', 'SIGTERM'}:
- loop.add_signal_handler(getattr(signal, signame),
- task.cancel)
- await task # Wait for the task to complete
- except asyncio.CancelledError:
- pass # Task was cancelled, no need for an error message
- async def list_services(address):
- async with BleClient(address) as mppt:
- await mppt.list_services()
- async def scan_for_devices():
- devices = await BleakScanner.discover()
- if not devices:
- print("No BLE devices found.")
- else:
- print("Available BLE devices:")
- for device in devices:
- print(f"{device.address} - {device.name}")
- return devices
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Solarlife MPPT BLE Client')
- parser.add_argument('address', help='BLE device address')
- parser.add_argument('--host', help='MQTT broker host', default='localhost')
- parser.add_argument('--port', help='MQTT broker port', default=1883, type=int)
- parser.add_argument('--username', help='MQTT username')
- parser.add_argument('--password', help='MQTT password')
- parser.add_argument('--list-services', help='List GATT services', action='store_true')
- parser.add_argument('--scan', help='Scan for bluetooth devices', action='store_true')
- args = parser.parse_args()
- if args.scan:
- asyncio.run(scan_for_devices())
- elif args.list_services:
- asyncio.run(list_services(args.address))
- else:
- asyncio.run(main(args.address, args.host, args.port, args.username, args.password))
|