12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import logging
- from logging.config import dictConfig
- import time
- import yaml
- from importlib import import_module
- # read config
- with open("config.yml", "r") as f:
- config = yaml.load(f, Loader=yaml.FullLoader)
- dictConfig(config['Logging'])
- logger = logging.getLogger(__name__)
- def createModules(configItems, type):
- for item in configItems:
- cls = next(iter(item))
- module = import_module(f"{type}s.{item[cls]}")
- if item.get('enabled') == False:
- continue
- params = item.copy()
- params.pop(cls, None)
- params.pop('enabled', None)
- params.pop('submodules', None)
- try:
- yield getattr(module, cls)(**params)
- except Exception as ex:
- logger.exception(F"{type} {cls} couldn't be initialized.")
- raise
- # setup input modules
- inputs = list(createModules(config['Inputs'], "input"))
- # setup middlewares recursively
- def createMiddlewares(configItems, parent = None):
- items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
- middlewares = list(createModules(items, "middleware"))
- for (item, middleware) in zip(items, middlewares):
- if 'submodules' in item:
- middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
- return middlewares
- middlewares = createMiddlewares(config['Middlewares'])
- # setup output modules
- outputs = list(createModules(config['Outputs'], "output"))
- for source in inputs:
- source.start()
- logger.debug("started sources")
- def executeMiddleware(middleware, values):
- submodules = getattr(middleware, 'submodules', [])
- result = middleware.execute(values)
- if not submodules:
- return result
- else:
- subResults = set()
- for submodule in submodules:
- tmp = executeMiddleware(submodule, result)
- if tmp:
- subResults.update(tmp)
- return subResults
- while True:
- values = set()
- for input in inputs:
- values.update(input.read())
- results = set()
- for middleware in middlewares:
- tmp = executeMiddleware(middleware, values)
- if tmp:
- results.update(tmp)
- else:
- results = values
- for output in outputs:
- output.write(results)
-
- time.sleep(1.9)
|