replay_influxdb.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import logging, time
  2. from datetime import datetime, timedelta
  3. from influxdb_client import InfluxDBClient
  4. from inputs.common import Input
  5. from structures.measurement import Measurement24v, Measurement480v
  6. from structures.plant import CompactLogixState, S7State
  7. dataclasses = [
  8. Measurement24v,
  9. Measurement480v,
  10. CompactLogixState,
  11. S7State
  12. ]
  13. logger = logging.getLogger(__name__)
  14. class Replay(Input):
  15. def __init__(self, url, token, org, bucket, start_time) -> None:
  16. super().__init__(self.read_handler)
  17. self.interval = 1.0
  18. self.client = InfluxDBClient(url, token, org=org)
  19. self.bucket = bucket
  20. self.query_api = self.client.query_api()
  21. self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S")
  22. def read_handler(self):
  23. start = self.current_time
  24. logger.info(start)
  25. end = start + timedelta(seconds=1)
  26. for result in self.query(start, end):
  27. self._q.put(result)
  28. self.current_time = end
  29. def query(self, start, stop):
  30. query = f'from(bucket:"{self.bucket}")\
  31. |> range(start: {start}, stop: {stop})'
  32. result = self.query_api.query(query=query)
  33. results = []
  34. for table in result:
  35. for dataclass in dataclasses:
  36. if table in dataclass.__name__.lower():
  37. break
  38. for record in table.records:
  39. res = dataclass(
  40. series = table,
  41. timestamp = record.get_time(),
  42. **record.values
  43. )
  44. results.append(res)
  45. return results