|
@@ -1,10 +1,28 @@
|
|
|
import snap7
|
|
|
import time
|
|
|
-import ctypes
|
|
|
+import re
|
|
|
+from datetime import datetime
|
|
|
|
|
|
-server = snap7.server.Server()
|
|
|
+from inputs.common import Input
|
|
|
|
|
|
-data = ctypes.c_buffer(0, 12)
|
|
|
+class SiemensCPU(Input):
|
|
|
+ interval = 0.02
|
|
|
|
|
|
-server.register_area(1, 0, data)
|
|
|
-snap7.server.mainloop(102)
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__(self.read_handler)
|
|
|
+ self.server = snap7.server.Server(True)
|
|
|
+ size = 100
|
|
|
+ self.DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
|
|
|
+ self.server.register_area(snap7.types.srvAreaDB, 1, self.DBdata)
|
|
|
+ self.server.start(1102)
|
|
|
+
|
|
|
+ def read_handler(self):
|
|
|
+ event : snap7.types.SrvEvent
|
|
|
+ while event := self.server.pick_event():
|
|
|
+ timestamp = datetime.now()
|
|
|
+ text = self.server.event_text(event)
|
|
|
+ match = re.match("^(?P<datetime>\d+-\d+-\d+ \d+:\d+:\d+) \[(?P<host>[\w\.:]+)\] (?P<type>[\w ]+), Area : (?P<area>.+), Start : (?P<start>\d+), Size : (?P<size>\d+) --> (?P<response>.+)$", text)
|
|
|
+ if not match:
|
|
|
+ print(text)
|
|
|
+ continue
|
|
|
+
|