main.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import logging
  2. import time
  3. import yaml
  4. from importlib import import_module
  5. logging.basicConfig(level=logging.INFO)
  6. logger = logging.getLogger(__name__)
  7. logger.info("starting")
  8. # read config
  9. with open("config.yml", "r") as f:
  10. config = yaml.load(f, Loader=yaml.FullLoader)
  11. def createModules(configItems, type):
  12. for item in configItems:
  13. cls = next(iter(item))
  14. module = import_module(f"{type}s.{item[cls]}")
  15. if item.get('enabled') == False:
  16. continue
  17. params = item.copy()
  18. params.pop(cls, None)
  19. params.pop('enabled', None)
  20. params.pop('submodules', None)
  21. try:
  22. yield getattr(module, cls)(**params)
  23. except Exception as ex:
  24. logger.exception(F"{type} {cls} couldn't be initialized.")
  25. raise
  26. # setup input modules
  27. inputs = list(createModules(config['Inputs'], "input"))
  28. # setup middlewares recursively
  29. def createMiddlewares(configItems, parent = None):
  30. items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
  31. middlewares = list(createModules(items, "middleware"))
  32. for (item, middleware) in zip(items, middlewares):
  33. if 'submodules' in item:
  34. middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
  35. return middlewares
  36. middlewares = createMiddlewares(config['Middlewares'])
  37. # setup output modules
  38. outputs = list(createModules(config['Outputs'], "output"))
  39. for source in inputs:
  40. source.start()
  41. logger.info("started sources")
  42. def executeMiddleware(middleware, values):
  43. submodules = getattr(middleware, 'submodules', [])
  44. result = middleware.execute(values)
  45. if not submodules:
  46. return result
  47. else:
  48. subResults = set()
  49. for submodule in submodules:
  50. tmp = executeMiddleware(submodule, result)
  51. if tmp:
  52. subResults.update(tmp)
  53. return subResults
  54. while True:
  55. values = set()
  56. for input in inputs:
  57. values.update(input.read())
  58. results = set()
  59. for middleware in middlewares:
  60. tmp = executeMiddleware(middleware, values)
  61. if tmp:
  62. results.update(tmp)
  63. else:
  64. results = values
  65. for output in outputs:
  66. output.write(results)
  67. time.sleep(1.9)