import logging, time from datetime import datetime, timedelta from influxdb_client import InfluxDBClient from inputs.common import Input logger = logging.getLogger(__name__) class Replay(Input): def __init__(self, url, token, org, bucket, start_time) -> None: super().__init__(self.read_handler) self.interval = 1.0 self.client = InfluxDBClient(url, token, org=org) self.bucket = bucket self.query_api = self.client.query_api() self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S") def read_handler(self): start = self.current_time logger.info(start) end = start + timedelta(seconds=1) for result in self.query(start, end): self._q.put(result) self.current_time = end def query(self, start, stop): query = f'from(bucket:"{self.bucket}")\ |> range(start: {start}, stop: {stop})' result = self.query_api.query(query=query) results = [] for table in result: for record in table.records: res = { 'series': table, 'timestamp': record.get_time(), } res.extend(record.values) results.append(res) return results