main.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import logging
  2. from logging.config import dictConfig
  3. import time
  4. import yaml
  5. from importlib import import_module
  6. # read config
  7. with open("config.yml", "r") as f:
  8. config = yaml.load(f, Loader=yaml.FullLoader)
  9. dictConfig(config['Logging'])
  10. logger = logging.getLogger(__name__)
  11. def createModules(configItems, type):
  12. for item in configItems:
  13. cls = next(iter(item))
  14. module = import_module(f"{type}s.{item[cls]}")
  15. if item.get('enabled') == False:
  16. continue
  17. params = item.copy()
  18. params.pop(cls, None)
  19. params.pop('enabled', None)
  20. params.pop('submodules', None)
  21. try:
  22. yield getattr(module, cls)(**params)
  23. except Exception as ex:
  24. logger.exception(F"{type} {cls} couldn't be initialized.")
  25. raise
  26. # setup input modules
  27. inputs = list(createModules(config['Inputs'], "input"))
  28. # setup middlewares recursively
  29. def createMiddlewares(configItems, parent = None):
  30. items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
  31. middlewares = list(createModules(items, "middleware"))
  32. for (item, middleware) in zip(items, middlewares):
  33. if 'submodules' in item:
  34. middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
  35. return middlewares
  36. middlewares = createMiddlewares(config['Middlewares'])
  37. # setup output modules
  38. outputs = list(createModules(config['Outputs'], "output"))
  39. for source in inputs:
  40. source.start()
  41. logger.debug("started sources")
  42. def executeMiddleware(middleware, values):
  43. submodules = getattr(middleware, 'submodules', [])
  44. result = middleware.execute(values)
  45. if not submodules:
  46. return result
  47. else:
  48. subResults = set()
  49. for submodule in submodules:
  50. tmp = executeMiddleware(submodule, result)
  51. if tmp:
  52. subResults.update(tmp)
  53. return subResults
  54. while True:
  55. values = set()
  56. for input in inputs:
  57. values.update(input.read())
  58. results = set()
  59. for middleware in middlewares:
  60. tmp = executeMiddleware(middleware, values)
  61. if tmp:
  62. results.update(tmp)
  63. else:
  64. results = values
  65. for output in outputs:
  66. output.write(results)
  67. time.sleep(1.9)