subDesTagesMitExtraKaese 2 лет назад
Родитель
Сommit
f63eb6b591

+ 0 - 3
box-pc/plc-connector/config.yml

@@ -14,9 +14,6 @@ Inputs:
   - AllenBradleyCPU: rockwell.allen_bradley_connect
     host: "192.168.1.15"
     enabled: True
-  
-  - Input: dummy
-    message: Hello World!
 
   - Replay: replay_influxdb
     enabled: False

+ 77 - 0
box-pc/plc-connector/dummy-config.yml

@@ -0,0 +1,77 @@
+
+Inputs:
+  - SiemensCPU: siemens.snap7_connect
+    enabled: False
+    host: "192.168.0.10"
+
+  - SiemensServer: siemens.snap7_server
+    enabled: True
+    port: 102
+
+  - Balluff: balluff.balluff_html
+    enabled: False
+
+  - AllenBradleyCPU: rockwell.allen_bradley_connect
+    enabled: False
+    host: "192.168.1.15"
+  
+  - Input: dummy
+    message: Hello World!
+
+  - Replay: replay_influxdb
+    enabled: False
+    url: "http://influxdb:8086"
+    token: "XPBViJ3s4JL9_wPffwd5M2EgXj5hcUgT0n4jNhv7m6-NC-6SSxQ3run4kXtWBvOk-FYr1VG5Tj5WcoHgjge9jw=="
+    org: "laempe"
+    bucket: "energy-monitor"
+    start_time: 03.05.2022 07:00:00
+
+Middlewares:
+  - PrintStats: print_stats
+    enabled: True
+    enable_output: False
+  - TimeCorrelation: time_correlation
+    submodules:
+    - PrintStats: print_stats
+
+
+Outputs:
+  - CSVStorage: csv_file
+    path: dummy-logs
+    enabled: false
+  - JSONOutput: stdout
+    enabled: True
+
+Logging:
+  version: 1
+  formatters:
+    standard:
+      #format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
+      format: "[%(levelname)s] %(name)s: %(message)s"
+  handlers:
+    default:
+      level: INFO
+      formatter: standard
+      class: logging.StreamHandler
+      stream: ext://sys.stderr
+  loggers:
+    '':
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false
+    inputs.rockwell.allen_bradley_connect:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    snap7.server:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    __main__:
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false

+ 20 - 8
box-pc/plc-connector/inputs/dummy.py

@@ -1,6 +1,7 @@
 import logging
 import random
-from datetime import datetime
+import math
+from datetime import datetime, timedelta, time
 from inputs.common import Input as Inp
 
 from structures.measurement import Measurement24v, Measurement480v
@@ -21,12 +22,23 @@ class Input(Inp):
   def __init__(self, message) -> None:
     super().__init__(self.read_handler)
     logger.debug(message)
-    self.interval = 0.01
+    self.interval = 0.1
   
   def read_handler(self):
+    current = datetime.now()
+    current_td = timedelta(
+      hours = current.hour, 
+      minutes = current.minute, 
+      seconds = current.second, 
+      microseconds = current.microsecond)
+
+    # discretize to specified resolution
+    to_sec = timedelta(seconds = round(current_td.total_seconds(), max(0, int(-math.log10(self.interval)))))
+    timestamp = datetime.combine(current, time(0)) + to_sec
+
     self._q.put(Measurement24v(
-      datetime.now(), 
-      "dummy", 
+      timestamp - timedelta(seconds=0.05), 
+      "dummy24v", 
       (f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f()),
       (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
       (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
@@ -36,15 +48,15 @@ class Input(Inp):
       f() + 23.5
     ))
     self._q.put(Measurement480v(
-      datetime.now(),
-      "dummy",
+      timestamp - timedelta(seconds=0.03),
+      "dummy480v",
       (f()+230, f()+230, f()+230),
       (f(), f(), f()),
       (i(360), i(360), i(360))
     ))
     self._q.put(CompactLogixState(
-      datetime.now(),
-      "dummy",
+      timestamp,
+      "dummyAB",
       i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
       i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
       i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 

+ 28 - 10
box-pc/plc-connector/main.py

@@ -1,12 +1,18 @@
-import logging
-from logging.config import dictConfig
 import time
 import yaml
+import argparse
+import logging
+from logging.config import dictConfig
 from importlib import import_module
 
+# get config file from arguments
+
+parser = argparse.ArgumentParser(description='PLC Connector')
+parser.add_argument('-c', '--config', type=str, default='config.yml', help='config file')
+args = parser.parse_args()
+
 # read config
-with open("config.yml", "r") as f:
-  config = yaml.load(f, Loader=yaml.FullLoader)
+config = yaml.safe_load(open(args.config, 'r'))
 
 dictConfig(config['Logging'])
 logger = logging.getLogger(__name__)
@@ -21,6 +27,7 @@ def createModules(configItems, type):
     params.pop(cls, None)
     params.pop('enabled', None)
     params.pop('submodules', None)
+    params.pop('enable_output', None)
     try:
       yield getattr(module, cls)(**params)
     except Exception as ex:
@@ -37,6 +44,10 @@ def createMiddlewares(configItems, parent = None):
   for (item, middleware) in zip(items, middlewares):
     if 'submodules' in item:
       middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
+      middleware.enable_output = item.get('enable_output', False)
+    else:
+      middleware.enable_output = item.get('enable_output', True)
+
   return middlewares
 
 middlewares = createMiddlewares(config['Middlewares'])
@@ -53,14 +64,13 @@ logger.debug("started sources")
 def executeMiddleware(middleware, values):
   submodules = getattr(middleware, 'submodules', [])
   result = middleware.execute(values)
-  if not submodules:
+  if not submodules and middleware.enable_output:
+    print(middleware.enable_output, type(middleware).__name__, submodules, result)
     return result
   else:
-    subResults = set()
+    subResults = []
     for submodule in submodules:
-      tmp = executeMiddleware(submodule, result)
-      if tmp:
-        subResults.update(tmp)
+      subResults += executeMiddleware(submodule, result)
     return subResults
 
 while True:
@@ -68,14 +78,22 @@ while True:
   for input in inputs:
     values.update(input.read())
 
+  # sort the set by timestamp and series
+  values = sorted(values, key=lambda x: (x.timestamp, x.series))
+
+  # execute middlewares recursively and collect results of leaf modules
+
   results = set()
   for middleware in middlewares:
     tmp = executeMiddleware(middleware, values)
     if tmp:
       results.update(tmp)
-  else:
+  if not middlewares:
     results = values
 
+  # sort the set by timestamp and series
+  results = sorted(results, key=lambda x: (x.timestamp, x.series))
+
   for output in outputs:
     output.write(results)
     

+ 5 - 2
box-pc/plc-connector/middlewares/print_stats.py

@@ -12,6 +12,7 @@ class PrintStats:
     dt = time.monotonic() - self.startTime
     self.startTime = time.monotonic()
     text = ""
+    warn = False
     for meas in values:
       id = "{} {}".format(meas.series, meas.source)
       if id in counts:
@@ -25,8 +26,10 @@ class PrintStats:
         text += "{}: {:4d} in {:.03f}s, {:.1f}/s    ".format(id, counts[id], dt, counts[id] / dt)
     else:
       text = "0 Messungen in {:.03f}s               ".format(dt)
+      warn = True
 
-    if not counts or len(ids) < 3:
+    if warn:
       logger.warning(text)
     else:
-      logger.info(text)
+      logger.info(text)
+    return values

+ 32 - 3
box-pc/plc-connector/middlewares/time_correlation.py

@@ -1,10 +1,39 @@
 import logging
+from datetime import datetime, timedelta
+from dataclasses import dataclass, field
+from structures.common import BaseMeasurement
 
 logger = logging.getLogger(__name__)
 
+@dataclass(frozen=True)
+class CorrelatedMeasurements(BaseMeasurement):
+  series: str = field(default="correlated", init=False)
+  measurement_24v: BaseMeasurement
+  measurement_480v: BaseMeasurement
+  measurement_plant: BaseMeasurement
+
 class TimeCorrelation:
   def __init__(self, parent):
-    pass
+    self.state = {}
+    self.timestamp = None
+
+  def execute(self, values: list):
+    results = []
+    for i, measurement in enumerate(values):
+      self.state[type(measurement).__name__] = measurement
+
+      if self.timestamp and self.timestamp > measurement.timestamp + timedelta(milliseconds=100):
+        logger.error(f"Timestamps are not in order: {measurement.series} is {self.timestamp - measurement.timestamp} to late")
+      
+      if len(values) > i+1 and values[i+1].timestamp == measurement.timestamp:
+        continue
 
-  def execute(self, values):
-    return values
+      self.timestamp = measurement.timestamp
+      results.append(CorrelatedMeasurements(
+        timestamp = measurement.timestamp,
+        source = ','.join([x.source for x in self.state.values()]),
+        measurement_24v = self.state.get("Measurement24v", None),
+        measurement_480v = self.state.get("Measurement480v", None),
+        measurement_plant = self.state.get("CompactLogixState", None) or self.state.get("S7State", None)
+      ))
+    return results

+ 13 - 0
box-pc/plc-connector/outputs/stdout.py

@@ -0,0 +1,13 @@
+import logging
+import json
+from dataclasses import asdict
+
+logger = logging.getLogger(__name__)
+
+class JSONOutput:
+  
+  def write(self, values: set):
+    for measurement in values:
+      d = asdict(measurement)
+      d['timestamp'] = d['timestamp'].isoformat()
+      print(json.dumps(d))