123456789101112131415161718192021222324252627282930313233343536373839 |
- import logging
- from influxdb_client import InfluxDBClient, Point
- from influxdb_client.client.write_api import SYNCHRONOUS
- import dataclasses
- class InfluxDB:
- def __init__(self, url):
- self.client = InfluxDBClient(url=url, token="XPBViJ3s4JL9_wPffwd5M2EgXj5hcUgT0n4jNhv7m6-NC-6SSxQ3run4kXtWBvOk-FYr1VG5Tj5WcoHgjge9jw==", org="laempe")
- self.bucket = "energy-monitor"
- self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
- self.query_api = self.client.query_api()
- def write(self, values):
- points = []
- for meas in values:
- p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
- for field in dataclasses.fields(meas):
- if not field.name in ["timestamp", "series", "source"]:
- value = getattr(meas, field.name)
- if type(value) is bool:
- p.field(field.name, int(value))
- elif not type(value) is tuple:
- p.field(field.name, value)
- else:
- for i, v in enumerate(value):
- pt = Point(meas.series).time(meas.timestamp).tag("source", meas.source).tag("channel", i)
- if type(v) is bool:
- pt.field(F"{field.name}", int(v))
- else:
- pt.field(F"{field.name}", v)
- points.append(pt)
- points.append(p)
- try:
- self.write_api.write(bucket=self.bucket, record=points)
- except Exception as ex:
- logging.exception("Influx DB write failed")
|