influxdb.py 1.1 KB

1234567891011121314151617181920212223242526272829303132
  1. import logging
  2. from influxdb_client import InfluxDBClient, Point
  3. from influxdb_client.client.write_api import SYNCHRONOUS
  4. import dataclasses
  5. class InfluxDB:
  6. def __init__(self, url):
  7. self.client = InfluxDBClient(url=url, token="XPBViJ3s4JL9_wPffwd5M2EgXj5hcUgT0n4jNhv7m6-NC-6SSxQ3run4kXtWBvOk-FYr1VG5Tj5WcoHgjge9jw==", org="laempe")
  8. self.bucket = "energy-monitor"
  9. self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
  10. self.query_api = self.client.query_api()
  11. def write(self, values):
  12. points = []
  13. for meas in values:
  14. p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
  15. for field in dataclasses.fields(meas):
  16. if not field.name in ["timestamp", "series", "source"]:
  17. value = getattr(meas, field.name)
  18. if not type(value) is tuple:
  19. p.field(field.name, value)
  20. else:
  21. for i, v in enumerate(value):
  22. p.field(F"{field.name}_{i}", v)
  23. points.append(p)
  24. try:
  25. self.write_api.write(bucket=self.bucket, record=points)
  26. except Exception as ex:
  27. logging.exception("Influx DB write failed")