snap7_server.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import snap7
  2. import time
  3. import re
  4. from datetime import datetime
  5. from inputs.common import Input
  6. localtz = datetime.now().astimezone().tzinfo
  7. class SiemensCPU(Input):
  8. interval = 0.02
  9. def __init__(self):
  10. super().__init__(self.read_handler)
  11. self.server = snap7.server.Server(True)
  12. size = 100
  13. self.DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
  14. self.server.register_area(snap7.types.srvAreaDB, 1, self.DBdata)
  15. self.server.start(102)
  16. def read_handler(self):
  17. event : snap7.types.SrvEvent
  18. while event := self.server.pick_event():
  19. timestamp = datetime.now(localtz)
  20. text = self.server.event_text(event)
  21. match = re.match("^(?P<datetime>\d+-\d+-\d+ \d+:\d+:\d+) \[(?P<host>[\w\.:]+)\] (?P<type>[\w ]+), Area : (?P<area>.+), Start : (?P<start>\d+), Size : (?P<size>\d+) --> (?P<response>.+)$", text)
  22. if not match:
  23. print(text)
  24. continue
  25. if match.group("type") != "Write request" or match.group("area") != "DB1":
  26. print(text)
  27. continue
  28. raw = bytearray(self.DBdata)
  29. #print("{} {:4d} {:4d}".format(timestamp.microsecond, int(match.group("start")), int(match.group("size"))))
  30. if int(match.group("start")) < 30:
  31. self.queue_ifm_from_bytes("S7", timestamp, raw[0:30])
  32. if int(match.group("start")) < 68 and int(match.group("start")) + int(match.group("size")) > 32:
  33. self.queue_energy_meter_from_bytes("S7", timestamp, raw[32:68])