influxdb.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import logging
  2. from influxdb_client import InfluxDBClient, Point
  3. from influxdb_client.client.write_api import SYNCHRONOUS
  4. import dataclasses
  5. class InfluxDB:
  6. def __init__(self, url):
  7. self.client = InfluxDBClient(url=url, token="XPBViJ3s4JL9_wPffwd5M2EgXj5hcUgT0n4jNhv7m6-NC-6SSxQ3run4kXtWBvOk-FYr1VG5Tj5WcoHgjge9jw==", org="laempe")
  8. self.bucket = "energy-monitor"
  9. self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
  10. self.query_api = self.client.query_api()
  11. def write(self, values):
  12. points = []
  13. for meas in values:
  14. p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
  15. for field in dataclasses.fields(meas):
  16. if not field.name in ["timestamp", "series", "source"]:
  17. value = getattr(meas, field.name)
  18. if type(value) is bool:
  19. p.field(field.name, int(value))
  20. elif not type(value) is tuple:
  21. p.field(field.name, value)
  22. else:
  23. for i, v in enumerate(value):
  24. pt = Point(meas.series).time(meas.timestamp).tag("source", meas.source).tag("channel", i)
  25. if type(v) is bool:
  26. pt.field(F"{field.name}", int(v))
  27. else:
  28. pt.field(F"{field.name}", v)
  29. points.append(pt)
  30. points.append(p)
  31. try:
  32. self.write_api.write(bucket=self.bucket, record=points)
  33. except Exception as ex:
  34. logging.exception("Influx DB write failed")