import logging import time import yaml from importlib import import_module logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.info("starting") # read config with open("config.yml", "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) def createModules(configItems, type): for item in configItems: cls = next(iter(item)) module = import_module(f"{type}s.{item[cls]}") if item.get('enabled') == False: continue params = item.copy() params.pop(cls, None) params.pop('enabled', None) params.pop('submodules', None) try: yield getattr(module, cls)(**params) except Exception as ex: logger.exception(F"{type} {cls} couldn't be initialized.") raise # setup input modules inputs = list(createModules(config['Inputs'], "input")) # setup middlewares recursively def createMiddlewares(configItems, parent = None): items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False] middlewares = list(createModules(items, "middleware")) for (item, middleware) in zip(items, middlewares): if 'submodules' in item: middleware.submodules = list(createMiddlewares(item['submodules'], middleware)) return middlewares middlewares = createMiddlewares(config['Middlewares']) # setup output modules outputs = list(createModules(config['Outputs'], "output")) for source in inputs: source.start() logger.info("started sources") def executeMiddleware(middleware, values): submodules = getattr(middleware, 'submodules', []) result = middleware.execute(values) if not submodules: return result else: subResults = set() for submodule in submodules: tmp = executeMiddleware(submodule, result) if tmp: subResults.update(tmp) return subResults while True: values = set() for input in inputs: values.update(input.read()) results = set() for middleware in middlewares: tmp = executeMiddleware(middleware, values) if tmp: results.update(tmp) else: results = values for output in outputs: output.write(results) time.sleep(1.9)