replay_influxdb.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import logging, time
  2. from datetime import datetime, timedelta
  3. from influxdb_client import InfluxDBClient
  4. from inputs.common import Input
  5. logger = logging.getLogger(__name__)
  6. class Replay(Input):
  7. def __init__(self, url, token, org, bucket, start_time) -> None:
  8. super().__init__(self.read_handler)
  9. self.interval = 1.0
  10. self.client = InfluxDBClient(url, token, org=org)
  11. self.bucket = bucket
  12. self.query_api = self.client.query_api()
  13. self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S")
  14. def read_handler(self):
  15. start = self.current_time
  16. logger.info(start)
  17. end = start + timedelta(seconds=1)
  18. for result in self.query(start, end):
  19. self._q.put(result)
  20. self.current_time = end
  21. def query(self, start, stop):
  22. query = f'from(bucket:"{self.bucket}")\
  23. |> range(start: {start}, stop: {stop})'
  24. result = self.query_api.query(query=query)
  25. results = []
  26. for table in result:
  27. for record in table.records:
  28. res = {
  29. 'series': table,
  30. 'timestamp': record.get_time(),
  31. }
  32. res.extend(record.values)
  33. results.append(res)
  34. return results