main.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import time
  2. import yaml
  3. import argparse
  4. import logging
  5. from logging.config import dictConfig
  6. from importlib import import_module
  7. # get config file from arguments
  8. parser = argparse.ArgumentParser(description='PLC Connector')
  9. parser.add_argument('-c', '--config', type=str, default='config.yml', help='config file')
  10. args = parser.parse_args()
  11. # read config
  12. config = yaml.safe_load(open(args.config, 'r'))
  13. dictConfig(config['Logging'])
  14. logger = logging.getLogger(__name__)
  15. def createModules(configItems, type):
  16. for item in configItems:
  17. cls = next(iter(item))
  18. module = import_module(f"{type}s.{item[cls]}")
  19. if item.get('enabled') == False:
  20. continue
  21. params = item.copy()
  22. params.pop(cls, None)
  23. params.pop('enabled', None)
  24. params.pop('submodules', None)
  25. params.pop('enable_output', None)
  26. try:
  27. yield getattr(module, cls)(**params)
  28. except Exception as ex:
  29. logger.fatal(F"{type} {cls} couldn't be initialized.", exc_info=False)
  30. raise
  31. # setup input modules
  32. inputs = list(createModules(config['Inputs'], "input"))
  33. # setup middlewares recursively
  34. def createMiddlewares(configItems, parent = None):
  35. items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
  36. middlewares = list(createModules(items, "middleware"))
  37. for (item, middleware) in zip(items, middlewares):
  38. if 'submodules' in item:
  39. middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
  40. middleware.enable_output = item.get('enable_output', False)
  41. else:
  42. middleware.enable_output = item.get('enable_output', True)
  43. return middlewares
  44. middlewares = createMiddlewares(config['Middlewares'])
  45. # setup output modules
  46. outputs = list(createModules(config['Outputs'], "output"))
  47. for source in inputs:
  48. source.start()
  49. logger.debug("started sources")
  50. def executeMiddleware(middleware, values):
  51. submodules = getattr(middleware, 'submodules', [])
  52. result = list(middleware.execute(values))
  53. if not submodules and middleware.enable_output:
  54. return result
  55. else:
  56. subResults = []
  57. for submodule in submodules:
  58. subResults += executeMiddleware(submodule, result)
  59. return subResults
  60. while True:
  61. values = set()
  62. for input in inputs:
  63. values.update(input.read())
  64. # sort the set by timestamp and series
  65. values = sorted(values, key=lambda x: (x.timestamp, x.series))
  66. # execute middlewares recursively and collect results of leaf modules
  67. results = set()
  68. for middleware in middlewares:
  69. tmp = executeMiddleware(middleware, values)
  70. if tmp:
  71. results.update(tmp)
  72. if not middlewares:
  73. results = values
  74. # sort the set by timestamp and series
  75. results = sorted(results, key=lambda x: (x.timestamp, x.series))
  76. for output in outputs:
  77. output.write(results)
  78. time.sleep(1.9)