12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- import logging, time
- from datetime import datetime, timedelta
- from influxdb_client import InfluxDBClient
- from inputs.common import Input
- logger = logging.getLogger(__name__)
- class Replay(Input):
- def __init__(self, url, token, org, bucket, start_time) -> None:
- super().__init__(self.read_handler)
- self.interval = 1.0
- self.client = InfluxDBClient(url, token, org=org)
- self.bucket = bucket
- self.query_api = self.client.query_api()
- self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S")
-
- def read_handler(self):
- start = self.current_time
- logger.info(start)
- end = start + timedelta(seconds=1)
- for result in self.query(start, end):
- self._q.put(result)
- self.current_time = end
- def query(self, start, stop):
- query = f'from(bucket:"{self.bucket}")\
- |> range(start: {start}, stop: {stop})'
- result = self.query_api.query(query=query)
- results = []
- for table in result:
- for record in table.records:
- res = {
- 'series': table,
- 'timestamp': record.get_time(),
- }
- res.extend(record.values)
- results.append(res)
- return results
-
|