import logging, time from datetime import datetime, timedelta from influxdb_client import InfluxDBClient from inputs.common import Input from structures.measurement import Measurement24v, Measurement480v from structures.plant import CompactLogixState, S7State dataclasses = [ Measurement24v, Measurement480v, CompactLogixState, S7State ] logger = logging.getLogger(__name__) class Replay(Input): def __init__(self, url, token, org, bucket, start_time) -> None: super().__init__(self.read_handler) self.interval = 1.0 self.client = InfluxDBClient(url, token, org=org) self.bucket = bucket self.query_api = self.client.query_api() self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S") def read_handler(self): start = self.current_time logger.info(start) end = start + timedelta(seconds=1) for result in self.query(start, end): self._q.put(result) self.current_time = end def query(self, start, stop): query = f'from(bucket:"{self.bucket}")\ |> range(start: {start}, stop: {stop})' result = self.query_api.query(query=query) results = [] for table in result: for dataclass in dataclasses: if table in dataclass.__name__.lower(): break for record in table.records: res = dataclass( series = table, timestamp = record.get_time(), **record.values ) results.append(res) return results