snap7_server.py 896 B

12345678910111213141516171819202122232425262728
  1. import snap7
  2. import time
  3. import re
  4. from datetime import datetime
  5. from inputs.common import Input
  6. class SiemensCPU(Input):
  7. interval = 0.02
  8. def __init__(self):
  9. super().__init__(self.read_handler)
  10. self.server = snap7.server.Server(True)
  11. size = 100
  12. self.DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
  13. self.server.register_area(snap7.types.srvAreaDB, 1, self.DBdata)
  14. self.server.start(1102)
  15. def read_handler(self):
  16. event : snap7.types.SrvEvent
  17. while event := self.server.pick_event():
  18. timestamp = datetime.now()
  19. text = self.server.event_text(event)
  20. match = re.match("^(?P<datetime>\d+-\d+-\d+ \d+:\d+:\d+) \[(?P<host>[\w\.:]+)\] (?P<type>[\w ]+), Area : (?P<area>.+), Start : (?P<start>\d+), Size : (?P<size>\d+) --> (?P<response>.+)$", text)
  21. if not match:
  22. print(text)
  23. continue