influxdb.py 1.1 KB

123456789101112131415161718192021222324252627282930
  1. from influxdb_client import InfluxDBClient, Point
  2. from influxdb_client.client.write_api import SYNCHRONOUS
  3. import dataclasses
  4. class InfluxDB:
  5. def __init__(self):
  6. self.client = InfluxDBClient(url="http://localhost:8086", token="XPBViJ3s4JL9_wPffwd5M2EgXj5hcUgT0n4jNhv7m6-NC-6SSxQ3run4kXtWBvOk-FYr1VG5Tj5WcoHgjge9jw==", org="laempe")
  7. self.bucket = "energy-monitor"
  8. self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
  9. self.query_api = self.client.query_api()
  10. def write(self, values):
  11. points = []
  12. for meas in values:
  13. p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
  14. for field in dataclasses.fields(meas):
  15. if not field.name in ["timestamp", "series", "source"]:
  16. value = getattr(meas, field.name)
  17. if not type(value) is tuple:
  18. p.field(field.name, value)
  19. else:
  20. for i, v in enumerate(value):
  21. p.field(F"{field.name}_{i}", v)
  22. points.append(p)
  23. try:
  24. self.write_api.write(bucket=self.bucket, record=points)
  25. except Exception as ex:
  26. print("Influx DB write failed", ex)