12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import logging, time
- from datetime import datetime, timedelta
- from influxdb_client import InfluxDBClient
- from inputs.common import Input
- from structures.measurement import Measurement24v, Measurement480v
- from structures.plant import CompactLogixState, S7State
- dataclasses = [
- Measurement24v,
- Measurement480v,
- CompactLogixState,
- S7State
- ]
- logger = logging.getLogger(__name__)
- class Replay(Input):
- def __init__(self, url, token, org, bucket, start_time) -> None:
- super().__init__(self.read_handler)
- self.interval = 1.0
- self.client = InfluxDBClient(url, token, org=org)
- self.bucket = bucket
- self.query_api = self.client.query_api()
- self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S")
-
- def read_handler(self):
- start = self.current_time
- logger.info(start)
- end = start + timedelta(seconds=1)
- for result in self.query(start, end):
- self._q.put(result)
- self.current_time = end
- def query(self, start, stop):
- query = f'from(bucket:"{self.bucket}")\
- |> range(start: {start}, stop: {stop})'
- result = self.query_api.query(query=query)
- results = []
- for table in result:
- for dataclass in dataclasses:
- if table in dataclass.__name__.lower():
- break
- for record in table.records:
- res = dataclass(
- series = table,
- timestamp = record.get_time(),
- **record.values
- )
- results.append(res)
- return results
-
|