Browse Source

add more middlewares

subDesTagesMitExtraKaese 2 years ago
parent
commit
37deaccf27

+ 2 - 2
box-pc/plc-connector/config.yml

@@ -24,11 +24,11 @@ Inputs:
     start_time: 03.05.2022 07:00:00
 
 Middlewares:
-  - PrintStats: print_stats
+  - PrintStats: debug
     enabled: False
   - TimeCorrelation: time_correlation
     submodules:
-    - PrintStats: print_stats
+    - PrintStats: debug
 
 Outputs:
   - CSVStorage: csv_file

+ 17 - 6
box-pc/plc-connector/dummy-config.yml

@@ -5,7 +5,7 @@ Inputs:
     host: "192.168.0.10"
 
   - SiemensServer: siemens.snap7_server
-    enabled: True
+    enabled: False
     port: 102
 
   - Balluff: balluff.balluff_html
@@ -27,20 +27,31 @@ Inputs:
     start_time: 03.05.2022 07:00:00
 
 Middlewares:
-  - PrintStats: print_stats
-    enabled: True
+  - PrintStats: debug
+    enabled: False
     enable_output: False
   - TimeCorrelation: time_correlation
     submodules:
-    - PrintStats: print_stats
+    - MatchAny: filters
+      series: plant
+      enable_output: True
+      table_move_up: 1
+    - ComplexFilter: filters
+      predicate: "measurement_24v and avg(measurement_24v.current) > 0.6"
+      submodules:
+      - ComplexSelector: selectors
+        selector: "avg(measurement_480v.current)"
+        submodules:
+        - PrintStats: debug
+          enable_output: True
 
 
 Outputs:
   - CSVStorage: csv_file
     path: dummy-logs
-    enabled: false
-  - JSONOutput: stdout
     enabled: True
+  - JSONOutput: stdout
+    enabled: False
 
 Logging:
   version: 1

+ 2 - 1
box-pc/plc-connector/inputs/dummy.py

@@ -8,6 +8,7 @@ from structures.measurement import Measurement24v, Measurement480v
 from structures.plant import S7State, CompactLogixState
 
 logger = logging.getLogger(__name__)
+localtz = datetime.now().astimezone().tzinfo
 
 def f():
   return random.random()
@@ -34,7 +35,7 @@ class Input(Inp):
 
     # discretize to specified resolution
     to_sec = timedelta(seconds = round(current_td.total_seconds(), max(0, int(-math.log10(self.interval)))))
-    timestamp = datetime.combine(current, time(0)) + to_sec
+    timestamp = (datetime.combine(current, time(0)) + to_sec).astimezone(localtz)
 
     self._q.put(Measurement24v(
       timestamp - timedelta(seconds=0.05), 

+ 2 - 3
box-pc/plc-connector/main.py

@@ -31,7 +31,7 @@ def createModules(configItems, type):
     try:
       yield getattr(module, cls)(**params)
     except Exception as ex:
-      logger.exception(F"{type} {cls} couldn't be initialized.")
+      logger.fatal(F"{type} {cls} couldn't be initialized.", exc_info=False)
       raise
 
 # setup input modules
@@ -63,9 +63,8 @@ logger.debug("started sources")
 
 def executeMiddleware(middleware, values):
   submodules = getattr(middleware, 'submodules', [])
-  result = middleware.execute(values)
+  result = list(middleware.execute(values))
   if not submodules and middleware.enable_output:
-    print(middleware.enable_output, type(middleware).__name__, submodules, result)
     return result
   else:
     subResults = []

+ 0 - 0
box-pc/plc-connector/middlewares/aggregators.py


+ 7 - 3
box-pc/plc-connector/middlewares/common.py

@@ -1,10 +1,14 @@
+from structures.common import BaseMeasurement
 
 class MatchSeries:
   def __init__(self, series) -> None:
     self._series = series
   
-  def get_series(self, measurement):
-    if getattr(measurement, 'series', None) == self._series:
+  def get_series(self, measurement: BaseMeasurement):
+    if measurement.series == self._series:
       return measurement
     else:
-      return getattr(measurement, self._series, None)
+      # find the series in the data
+      for key, value in measurement.__dict__.items():
+        if isinstance(value, BaseMeasurement) and value.series == self._series:
+          return value

+ 10 - 1
box-pc/plc-connector/middlewares/print_stats.py → box-pc/plc-connector/middlewares/debug.py

@@ -32,4 +32,13 @@ class PrintStats:
       logger.warning(text)
     else:
       logger.info(text)
-    return values
+    return values
+
+class Warning:
+  def __init__(self, parent):
+    pass
+
+  def execute(self, values):
+    for meas in values:
+      logger.warning(str(meas))
+      yield meas

+ 0 - 27
box-pc/plc-connector/middlewares/filter.py

@@ -1,27 +0,0 @@
-
-from .common import MatchSeries
-
-class Filter(MatchSeries):
-  def __init__(self, field = None, value = None, **kwargs) -> None:
-    super().__init__(**kwargs)
-    self._field = field
-    self._value = value
-
-  def execute(self, values):
-    for measurement in values:
-      dataset = self.get_series(measurement)
-
-      if not dataset:
-        continue
-
-      if self._field:
-        value = getattr(dataset, self._field, None)
-        if not value:
-          continue
-        if self._value and self._value != value:
-          continue
-      
-      yield measurement
-      
-      
-

+ 92 - 0
box-pc/plc-connector/middlewares/filters.py

@@ -0,0 +1,92 @@
+import logging
+import math
+import dataclasses
+from .common import MatchSeries
+from structures.measurement import Measurement24v, Measurement480v
+from structures.plant import S7State, CompactLogixState
+from structures.correlated import CorrelatedMeasurements
+
+
+logger = logging.getLogger(__name__)
+"""
+This middleware filters the measurements by series and yields if any given field matches.
+"""
+class MatchAny(MatchSeries):
+  def __init__(self, parent, series, **kwargs) -> None:
+    super().__init__(series)
+    self._fields = kwargs
+
+  def execute(self, values):
+    for measurement in values:
+      dataset = self.get_series(measurement)
+
+      if not dataset:
+        continue
+      
+      if not self._fields:
+        yield measurement
+        continue
+
+      # check if any field matches
+      for field, value in self._fields.items():
+        if getattr(dataset, field, None) == value:
+          yield measurement
+          break
+      
+"""
+This middleware filters the measurements by series and yields if all given fields match.
+"""
+class MatchAll(MatchSeries):
+  def __init__(self, parent, series, **kwargs) -> None:
+    super().__init__(series)
+    self._fields = kwargs
+
+  def execute(self, values):
+    for measurement in values:
+      dataset = self.get_series(measurement)
+      if not dataset:
+        continue
+
+      # check if all fields match
+      success = True
+      for field, value in self._fields.items():
+        if getattr(dataset, field, None) != value:
+          success = False
+          break
+      if success:
+        yield measurement
+
+
+ALLOWED_NAMES = \
+  [x.name for x in dataclasses.fields(Measurement24v)] + \
+  [x.name for x in dataclasses.fields(Measurement480v)] + \
+  [x.name for x in dataclasses.fields(CompactLogixState)] + \
+  [x.name for x in dataclasses.fields(S7State)] + \
+  [x.name for x in dataclasses.fields(CorrelatedMeasurements)] + \
+  ['sum', 'min', 'max', 'avg', 'count', 'last']
+
+ALLOWED_NAMES = set([name for name in ALLOWED_NAMES if not name.startswith('_')])
+
+class ComplexFilter():
+  def __init__(self, parent, predicate) -> None:
+    self._predicate = predicate
+    self._compiled = compile(predicate, "<string>", "eval")
+    # Validate allowed names
+    for name in self._compiled.co_names:
+        if name not in ALLOWED_NAMES:
+            raise NameError(f"The use of '{name}' is not allowed in '{predicate}'")
+
+  def execute(self, values):
+    for measurement in values:
+      try:
+        if eval(self._compiled, {"__builtins__": {
+          'sum': sum,
+          'min': min,
+          'max': max,
+          'avg': lambda x: sum(x) / len(x),
+          'count': len,
+          'last': lambda x: x[-1],
+        }}, measurement.__dict__):
+          yield measurement
+      except Exception as e:
+        logger.error(f"Error while evaluating predicate '{self._predicate}': {e}")

+ 53 - 0
box-pc/plc-connector/middlewares/selectors.py

@@ -0,0 +1,53 @@
+import logging
+import re
+import dataclasses
+from .common import MatchSeries
+from structures.measurement import Measurement24v, Measurement480v
+from structures.plant import S7State, CompactLogixState
+from structures.correlated import CorrelatedMeasurements
+from structures.common import BaseMeasurement
+
+logger = logging.getLogger(__name__)
+
+ALLOWED_NAMES = \
+  [x.name for x in dataclasses.fields(Measurement24v)] + \
+  [x.name for x in dataclasses.fields(Measurement480v)] + \
+  [x.name for x in dataclasses.fields(CompactLogixState)] + \
+  [x.name for x in dataclasses.fields(S7State)] + \
+  [x.name for x in dataclasses.fields(CorrelatedMeasurements)] + \
+  ['sum', 'min', 'max', 'avg', 'count', 'last']
+
+ALLOWED_NAMES = set([name for name in ALLOWED_NAMES if not name.startswith('_')])
+
+@dataclasses.dataclass(frozen=True)
+class Selection(BaseMeasurement):
+  value: str
+
+class ComplexSelector():
+  def __init__(self, parent, selector) -> None:
+    self._selector = selector
+    self._compiled = compile(selector, "<string>", "eval")
+    # Validate allowed names
+    for name in self._compiled.co_names:
+        if name not in ALLOWED_NAMES:
+            raise NameError(f"The use of '{name}' is not allowed in '{selector}'")
+
+  def execute(self, values):
+    for measurement in values:
+      try:
+        value = eval(self._compiled, {"__builtins__": {
+          'sum': sum,
+          'min': min,
+          'max': max,
+          'avg': lambda x: sum(x) / len(x),
+          'count': len,
+          'last': lambda x: x[-1],
+        }}, measurement.__dict__)
+        yield Selection(
+          timestamp = measurement.timestamp,
+          series = re.match(r"[\w_\.\-\(\)]+", self._selector).group(0),
+          source = "selection",
+          value=value
+        )
+      except Exception as e:
+        logger.error(f"Error while evaluating selector '{self._selector}': {e}")

+ 3 - 10
box-pc/plc-connector/middlewares/time_correlation.py

@@ -1,16 +1,9 @@
 import logging
-from datetime import datetime, timedelta
-from dataclasses import dataclass, field
-from structures.common import BaseMeasurement
+from datetime import timedelta
 
-logger = logging.getLogger(__name__)
+from structures.correlated import CorrelatedMeasurements
 
-@dataclass(frozen=True)
-class CorrelatedMeasurements(BaseMeasurement):
-  series: str = field(default="correlated", init=False)
-  measurement_24v: BaseMeasurement
-  measurement_480v: BaseMeasurement
-  measurement_plant: BaseMeasurement
+logger = logging.getLogger(__name__)
 
 class TimeCorrelation:
   def __init__(self, parent):

+ 9 - 5
box-pc/plc-connector/outputs/csv_file.py

@@ -5,6 +5,8 @@ import dataclasses
 import zipfile
 import logging
 
+from structures.common import BaseMeasurement
+
 logger = logging.getLogger(__name__)
 
 class CSVStorage:
@@ -63,13 +65,15 @@ class CSVFile:
       self.row_count = 0
 
 
-def dataclass_to_dict(dc):
+def dataclass_to_dict(dc, prefix=""):
   ret = {}
   for field in dataclasses.fields(dc):
     value = getattr(dc, field.name)
-    if not type(value) is tuple:
-      ret[field.name] = value
-    else:
+    if type(value) is tuple:
       for i, v in enumerate(value):
-        ret[F"{field.name}_{i}"] = v
+        ret[F"{prefix}{field.name}_{i}"] = v
+    elif isinstance(value, BaseMeasurement):
+      ret.update(dataclass_to_dict(value, F"{prefix}{value.series}_"))
+    else:
+      ret[F"{prefix}{field.name}"] = value
   return ret

+ 9 - 2
box-pc/plc-connector/outputs/stdout.py

@@ -1,13 +1,20 @@
 import logging
+import datetime
 import json
 from dataclasses import asdict
 
 logger = logging.getLogger(__name__)
 
+class DateTimeEncoder(json.JSONEncoder):
+  def default(self, z):
+    if isinstance(z, datetime.datetime):
+      return z.isoformat()
+    else:
+      return super().default(z)
+
 class JSONOutput:
   
   def write(self, values: set):
     for measurement in values:
       d = asdict(measurement)
-      d['timestamp'] = d['timestamp'].isoformat()
-      print(json.dumps(d))
+      print(json.dumps(d, cls=DateTimeEncoder))

+ 9 - 0
box-pc/plc-connector/structures/correlated.py

@@ -0,0 +1,9 @@
+from dataclasses import dataclass, field
+from structures.common import BaseMeasurement
+
+@dataclass(frozen=True)
+class CorrelatedMeasurements(BaseMeasurement):
+  series: str = field(default="correlated", init=False)
+  measurement_24v: BaseMeasurement
+  measurement_480v: BaseMeasurement
+  measurement_plant: BaseMeasurement