|
@@ -9,8 +9,8 @@ from structures.plant import CompactLogixState, S7State
|
|
|
dataclasses = [
|
|
|
Measurement24v,
|
|
|
Measurement480v,
|
|
|
+ S7State,
|
|
|
CompactLogixState,
|
|
|
- S7State
|
|
|
]
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -23,11 +23,11 @@ class Replay(Input):
|
|
|
self.bucket = bucket
|
|
|
|
|
|
self.query_api = self.client.query_api()
|
|
|
- self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S")
|
|
|
+ self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S %z")
|
|
|
+ self.time_offset = datetime.now().astimezone() - self.current_time
|
|
|
|
|
|
def read_handler(self):
|
|
|
start = self.current_time
|
|
|
- logger.info(start)
|
|
|
end = start + timedelta(seconds=1)
|
|
|
for result in self.query(start, end):
|
|
|
self._q.put(result)
|
|
@@ -35,21 +35,39 @@ class Replay(Input):
|
|
|
|
|
|
def query(self, start, stop):
|
|
|
query = f'from(bucket:"{self.bucket}")\
|
|
|
- |> range(start: {start}, stop: {stop})'
|
|
|
+ |> range(start: {start.isoformat()}, stop: {stop.isoformat()})\
|
|
|
+ |> yield(name: "m")'
|
|
|
result = self.query_api.query(query=query)
|
|
|
results = []
|
|
|
+ fields = {}
|
|
|
+ old_dataclass = None
|
|
|
for table in result:
|
|
|
- for dataclass in dataclasses:
|
|
|
- if table in dataclass.__name__.lower():
|
|
|
- break
|
|
|
+
|
|
|
+ if table.records:
|
|
|
+ record = table.records[0]
|
|
|
+ for cls in dataclasses:
|
|
|
+ if record.get_measurement() == cls.series:
|
|
|
+ dataclass = cls
|
|
|
+ break
|
|
|
+
|
|
|
+ if old_dataclass != dataclass:
|
|
|
+ results.extend(self.populate_dataclasses(old_dataclass, fields))
|
|
|
+ fields = {}
|
|
|
+ old_dataclass = dataclass
|
|
|
|
|
|
for record in table.records:
|
|
|
- res = dataclass(
|
|
|
- series = table,
|
|
|
- timestamp = record.get_time(),
|
|
|
- **record.values
|
|
|
- )
|
|
|
- results.append(res)
|
|
|
+ if not record.get_time() in fields:
|
|
|
+ fields[record.get_time()] = {}
|
|
|
+ field = fields[record.get_time()]
|
|
|
+ if 'channel' in record.values:
|
|
|
+ field[record.get_field()] = field[record.get_field()] + (record.get_value(), ) if record.get_field() in field else (record.get_value(), )
|
|
|
+ else:
|
|
|
+ field[record.get_field()] = record.get_value()
|
|
|
+ field['source'] = record['source']
|
|
|
+
|
|
|
+ results.extend(self.populate_dataclasses(dataclass, fields))
|
|
|
return results
|
|
|
|
|
|
-
|
|
|
+ def populate_dataclasses(self, dataclass, fields):
|
|
|
+ for time, values in fields.items():
|
|
|
+ yield dataclass(time + self.time_offset, **values)
|