main.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import time
  2. import yaml
  3. import argparse
  4. import logging
  5. from logging.config import dictConfig
  6. from importlib import import_module
  7. # get config file from arguments
  8. parser = argparse.ArgumentParser(description='PLC Connector')
  9. parser.add_argument('-c', '--config', type=str, default='config.yml', help='config file')
  10. args = parser.parse_args()
  11. # read config
  12. config = yaml.safe_load(open(args.config, 'r'))
  13. dictConfig(config['Logging'])
  14. logger = logging.getLogger(__name__)
  15. def createModules(configItems, type):
  16. for item in configItems:
  17. cls = next(iter(item))
  18. module = import_module(f"{type}s.{item[cls]}")
  19. if item.get('enabled') == False:
  20. continue
  21. params = item.copy()
  22. params.pop(cls, None)
  23. params.pop('enabled', None)
  24. params.pop('submodules', None)
  25. params.pop('enable_output', None)
  26. try:
  27. yield getattr(module, cls)(**params)
  28. except Exception as ex:
  29. logger.exception(F"{type} {cls} couldn't be initialized.")
  30. raise
  31. # setup input modules
  32. inputs = list(createModules(config['Inputs'], "input"))
  33. # setup middlewares recursively
  34. def createMiddlewares(configItems, parent = None):
  35. items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
  36. middlewares = list(createModules(items, "middleware"))
  37. for (item, middleware) in zip(items, middlewares):
  38. if 'submodules' in item:
  39. middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
  40. middleware.enable_output = item.get('enable_output', False)
  41. else:
  42. middleware.enable_output = item.get('enable_output', True)
  43. return middlewares
  44. middlewares = createMiddlewares(config['Middlewares'])
  45. # setup output modules
  46. outputs = list(createModules(config['Outputs'], "output"))
  47. for source in inputs:
  48. source.start()
  49. logger.debug("started sources")
  50. def executeMiddleware(middleware, values):
  51. submodules = getattr(middleware, 'submodules', [])
  52. result = middleware.execute(values)
  53. if not submodules and middleware.enable_output:
  54. print(middleware.enable_output, type(middleware).__name__, submodules, result)
  55. return result
  56. else:
  57. subResults = []
  58. for submodule in submodules:
  59. subResults += executeMiddleware(submodule, result)
  60. return subResults
  61. while True:
  62. values = set()
  63. for input in inputs:
  64. values.update(input.read())
  65. # sort the set by timestamp and series
  66. values = sorted(values, key=lambda x: (x.timestamp, x.series))
  67. # execute middlewares recursively and collect results of leaf modules
  68. results = set()
  69. for middleware in middlewares:
  70. tmp = executeMiddleware(middleware, values)
  71. if tmp:
  72. results.update(tmp)
  73. if not middlewares:
  74. results = values
  75. # sort the set by timestamp and series
  76. results = sorted(results, key=lambda x: (x.timestamp, x.series))
  77. for output in outputs:
  78. output.write(results)
  79. time.sleep(1.9)