time_correlation.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import logging
  2. from datetime import datetime, timedelta
  3. from dataclasses import dataclass, field
  4. from structures.common import BaseMeasurement
  5. logger = logging.getLogger(__name__)
  6. @dataclass(frozen=True)
  7. class CorrelatedMeasurements(BaseMeasurement):
  8. series: str = field(default="correlated", init=False)
  9. measurement_24v: BaseMeasurement
  10. measurement_480v: BaseMeasurement
  11. measurement_plant: BaseMeasurement
  12. class TimeCorrelation:
  13. def __init__(self, parent):
  14. self.state = {}
  15. self.timestamp = None
  16. def execute(self, values: list):
  17. results = []
  18. for i, measurement in enumerate(values):
  19. self.state[type(measurement).__name__] = measurement
  20. if self.timestamp and self.timestamp > measurement.timestamp + timedelta(milliseconds=100):
  21. logger.error(f"Timestamps are not in order: {measurement.series} is {self.timestamp - measurement.timestamp} to late")
  22. if len(values) > i+1 and values[i+1].timestamp == measurement.timestamp:
  23. continue
  24. self.timestamp = measurement.timestamp
  25. results.append(CorrelatedMeasurements(
  26. timestamp = measurement.timestamp,
  27. source = ','.join([x.source for x in self.state.values()]),
  28. measurement_24v = self.state.get("Measurement24v", None),
  29. measurement_480v = self.state.get("Measurement480v", None),
  30. measurement_plant = self.state.get("CompactLogixState", None) or self.state.get("S7State", None)
  31. ))
  32. return results