1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import logging
- import time
- import yaml
- from importlib import import_module
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- logger.info("starting")
- # read config
- with open("config.yml", "r") as f:
- config = yaml.load(f, Loader=yaml.FullLoader)
- def createModules(configItems, type):
- for item in configItems:
- cls = next(iter(item))
- module = import_module(f"{type}s.{item[cls]}")
- if item.get('enabled') == False:
- continue
- params = item.copy()
- params.pop(cls, None)
- params.pop('enabled', None)
- params.pop('submodules', None)
- try:
- yield getattr(module, cls)(**params)
- except Exception as ex:
- logger.exception(F"{type} {cls} couldn't be initialized.")
- raise
- # setup input modules
- inputs = list(createModules(config['Inputs'], "input"))
- # setup middlewares recursively
- def createMiddlewares(configItems, parent = None):
- items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
- middlewares = list(createModules(items, "middleware"))
- for (item, middleware) in zip(items, middlewares):
- if 'submodules' in item:
- middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
- return middlewares
- middlewares = createMiddlewares(config['Middlewares'])
- # setup output modules
- outputs = list(createModules(config['Outputs'], "output"))
- for source in inputs:
- source.start()
- logger.info("started sources")
- def executeMiddleware(middleware, values):
- submodules = getattr(middleware, 'submodules', [])
- result = middleware.execute(values)
- if not submodules:
- return result
- else:
- subResults = set()
- for submodule in submodules:
- tmp = executeMiddleware(submodule, result)
- if tmp:
- subResults.update(tmp)
- return subResults
- while True:
- values = set()
- for input in inputs:
- values.update(input.read())
- results = set()
- for middleware in middlewares:
- tmp = executeMiddleware(middleware, values)
- if tmp:
- results.update(tmp)
- else:
- results = values
- for output in outputs:
- output.write(results)
-
- time.sleep(1.9)
|