|
@@ -1,66 +1,83 @@
|
|
|
-from inputs.snap7_server import SiemensServer
|
|
|
-from inputs.snap7_connect import SiemensCPU
|
|
|
-from inputs.balluff_html import Balluff
|
|
|
-from inputs.allen_bradley_connect import AllenBradleyCPU
|
|
|
-from database import *
|
|
|
import logging
|
|
|
import time
|
|
|
+import yaml
|
|
|
+from importlib import import_module
|
|
|
|
|
|
-logging.basicConfig(level=logging.WARNING)
|
|
|
+logging.basicConfig(level=logging.INFO)
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
|
-logging.warning("starting")
|
|
|
+logger.info("starting")
|
|
|
|
|
|
-sources = [
|
|
|
- #SiemensCPU("192.168.0.10"),
|
|
|
- SiemensServer(),
|
|
|
- #Balluff(),
|
|
|
- AllenBradleyCPU("192.168.1.15"),
|
|
|
-]
|
|
|
+# read config
|
|
|
+with open("config.yml", "r") as f:
|
|
|
+ config = yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
|
|
-sinks = [
|
|
|
- InfluxDB("http://influxdb:8086"),
|
|
|
- CSVStorage("logs"),
|
|
|
-]
|
|
|
+def createModules(configItems, type):
|
|
|
+ for item in configItems:
|
|
|
+ cls = next(iter(item))
|
|
|
+ module = import_module(f"{type}s.{item[cls]}")
|
|
|
+ if item.get('enabled') == False:
|
|
|
+ continue
|
|
|
+ params = item.copy()
|
|
|
+ params.pop(cls, None)
|
|
|
+ params.pop('enabled', None)
|
|
|
+ params.pop('submodules', None)
|
|
|
+ try:
|
|
|
+ yield getattr(module, cls)(**params)
|
|
|
+ except Exception as ex:
|
|
|
+ logger.exception(F"{type} {cls} couldn't be initialized.")
|
|
|
+ raise
|
|
|
|
|
|
-for source in sources:
|
|
|
- source.start()
|
|
|
+# setup input modules
|
|
|
+inputs = list(createModules(config['Inputs'], "input"))
|
|
|
|
|
|
-logging.warning("started sources")
|
|
|
+# setup middlewares recursively
|
|
|
+def createMiddlewares(configItems, parent = None):
|
|
|
+ items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
|
|
|
+ middlewares = list(createModules(items, "middleware"))
|
|
|
+ for (item, middleware) in zip(items, middlewares):
|
|
|
+ if 'submodules' in item:
|
|
|
+ middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
|
|
|
+ return middlewares
|
|
|
|
|
|
-startTime = 0
|
|
|
+middlewares = createMiddlewares(config['Middlewares'])
|
|
|
|
|
|
-def printStats(values):
|
|
|
- global startTime
|
|
|
- counts = {}
|
|
|
- dt = time.monotonic() - startTime
|
|
|
- startTime = time.monotonic()
|
|
|
- text = ""
|
|
|
- for meas in values:
|
|
|
- id = "{} {}".format(meas.series, meas.source)
|
|
|
- if id in counts:
|
|
|
- counts[id] += 1
|
|
|
- else:
|
|
|
- counts[id] = 1
|
|
|
- if counts:
|
|
|
- ids = list(counts.keys())
|
|
|
- ids.sort()
|
|
|
- for id in ids:
|
|
|
- text += "{}: {:4d} in {:.03f}s, {:.1f}/s ".format(id, counts[id], dt, counts[id] / dt)
|
|
|
- else:
|
|
|
- text = "0 Messungen in {:.03f}s ".format(dt)
|
|
|
+# setup output modules
|
|
|
+outputs = list(createModules(config['Outputs'], "output"))
|
|
|
|
|
|
- if not counts or len(ids) < 3:
|
|
|
- logging.warning(text)
|
|
|
+
|
|
|
+for source in inputs:
|
|
|
+ source.start()
|
|
|
+
|
|
|
+logger.info("started sources")
|
|
|
+
|
|
|
+def executeMiddleware(middleware, values):
|
|
|
+ submodules = getattr(middleware, 'submodules', [])
|
|
|
+ result = middleware.execute(values)
|
|
|
+ if not submodules:
|
|
|
+ return result
|
|
|
else:
|
|
|
- logging.info(text)
|
|
|
+ subResults = set()
|
|
|
+ for submodule in submodules:
|
|
|
+ tmp = executeMiddleware(submodule, result)
|
|
|
+ if tmp:
|
|
|
+ subResults.update(tmp)
|
|
|
+ return subResults
|
|
|
|
|
|
while True:
|
|
|
- values = []
|
|
|
- for source in sources:
|
|
|
- values.extend(source.read())
|
|
|
+ values = set()
|
|
|
+ for input in inputs:
|
|
|
+ values.update(input.read())
|
|
|
|
|
|
- for sink in sinks:
|
|
|
- sink.write(values)
|
|
|
+ results = set()
|
|
|
+ for middleware in middlewares:
|
|
|
+ tmp = executeMiddleware(middleware, values)
|
|
|
+ if tmp:
|
|
|
+ results.update(tmp)
|
|
|
+ else:
|
|
|
+ results = values
|
|
|
|
|
|
- printStats(values)
|
|
|
+ for output in outputs:
|
|
|
+ output.write(results)
|
|
|
+
|
|
|
time.sleep(1.9)
|