瀏覽代碼

combine measurements of a single timestamp

Johannes Müller 3 年之前
父節點
當前提交
b3f400d9e1

+ 1 - 0
.gitignore

@@ -4,6 +4,7 @@
 *.out
 
 *.csv
+*.csv.zip
 
 ~$*
 __pycache__/

+ 47 - 21
box-pc/application/database/csvFile.py

@@ -1,32 +1,58 @@
 import csv
 import os
 from datetime import datetime
+import dataclasses
+from zipfile import ZipFile
 
-from structures.measurement import CurrentMeasurement
+from structures.measurement import Measurement24v
 
 class CSVFile:
-  lastMeas = [None] * 16
   path = "logs"
+  file = None
+  filename = None
+  row_count = 0
 
   def __init__(self) -> None:
-      if not os.path.exists(self.path):
-        os.mkdir(self.path)
-      filename = os.path.join(self.path, F"current_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
-      self.file = open(filename, "a", newline='')
-      self.writer = csv.writer(self.file, delimiter=' ')
+    if not os.path.exists(self.path):
+      os.mkdir(self.path)
+    self.new_file()
+
+  def new_file(self):
+
+    if self.file:
+      self.file.close()
+      with ZipFile(self.filename + ".zip", 'w') as zf:
+        zf.write(self.filename, os.path.basename(self.filename))
+      os.remove(self.filename)
+      
+    self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
+    self.file = open(self.filename, "w", newline='')
+    self.writer = csv.writer(self.file, delimiter=',')
 
   def write(self, values: list):
-    meas: CurrentMeasurement
-    for i, meas in enumerate(values):
-      prev : CurrentMeasurement = self.lastMeas[meas.channel]
-      if prev == None:
-        self.writer.writerow([meas.timestamp, meas.source, meas.channel, meas.current])
-        meas.saved = True
-      elif prev.current != meas.current:
-        if not prev.saved:
-          self.writer.writerow([prev.timestamp, prev.source, prev.channel, prev.current])
-        self.writer.writerow([meas.timestamp, meas.source, meas.channel, meas.current])
-        meas.saved = True
-
-      self.lastMeas[meas.channel] = meas
-      self.file.flush()
+    try:
+      meas: Measurement24v
+      for i, meas in enumerate(values):
+        row = dataclass_to_dict(meas)
+        if self.row_count == 0:
+          self.writer.writerow(row)
+        self.writer.writerow(row.values())
+        self.row_count += 1
+      self.file.flush()
+
+      if self.row_count > 50000:
+        self.new_file()
+        self.row_count = 0
+    except Exception as ex:
+      print("CSV write failed", ex)
+
+def dataclass_to_dict(dc):
+  ret = {}
+  for field in dataclasses.fields(dc):
+    value = getattr(dc, field.name)
+    if not type(value) is tuple:
+      ret[field.name] = value
+    else:
+      for i, v in enumerate(value):
+        ret[F"{field.name}_{i}"] = v
+  return ret

+ 14 - 3
box-pc/application/database/influxdb.py

@@ -1,5 +1,6 @@
 from influxdb_client import InfluxDBClient, Point
 from influxdb_client.client.write_api import SYNCHRONOUS
+import dataclasses
 
 class InfluxDB:
   def __init__(self):
@@ -13,7 +14,17 @@ class InfluxDB:
   def write(self, values):
     points = []
     for meas in values:
-      p = Point("24v").time(meas.timestamp).tag("source", meas.source).tag("channel", meas.channel).field("current", meas.current)
+      p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
+      for field in dataclasses.fields(meas):
+        if not field.name in ["timestamp", "series", "source"]:
+          value = getattr(meas, field.name)
+          if not type(value) is tuple:
+            p.field(field.name, value)
+          else:
+            for i, v in enumerate(value):
+              p.field(F"{field.name}_{i}", v)
       points.append(p)
-      
-    self.write_api.write(bucket=self.bucket, record=points)
+    try:
+      self.write_api.write(bucket=self.bucket, record=points)
+    except Exception as ex:
+      print("Influx DB write failed", ex)

+ 1 - 3
box-pc/application/database/sqliteDb.py

@@ -1,7 +1,5 @@
 import sqlite3
 
-from structures.measurement import CurrentMeasurement
-
 class SqliteDB:
   def __init__(self):
     self.con = sqlite3.connect('sqlite3.db')
@@ -10,6 +8,6 @@ class SqliteDB:
       cur.execute("CREATE TABLE current (timestamp text, source text, channel int, value real")
       cur.commit()
 
-  def write(self, meas: CurrentMeasurement):
+  def write(self, meas):
     with self.con.cursor() as cur:
       cur.execute("INSERT INTO current VALUES (")

+ 4 - 13
box-pc/application/inputs/allen_bradley_connect.py

@@ -2,9 +2,8 @@ from pylogix import PLC
 from threading import Thread
 import time
 from datetime import datetime
-import struct
 
-from structures.measurement import CurrentMeasurement
+
 from inputs.common import Input
 
 localtz = datetime.now().astimezone().tzinfo
@@ -28,14 +27,6 @@ class AllenBradleyCPU(Input):
   def read_handler(self):
     timestamp = datetime.now(localtz)
     ret = self.comm.Read(F"{self.tag}:I")
-    raw = ret.Value
-    data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw[self.E_offset:self.E_offset+30])
-
-    for i in range(16):
-      self._q.put(CurrentMeasurement(timestamp, "AB", i, data[i]))
-
-if __name__ == "__main__":
-  with AllenBradleyCPU() as cpu:
-    cpu.start()
-    while True:
-      time.sleep(1)
+    if ret.Status == "Success":
+      raw = ret.Value
+      self.queue_ifm_from_bytes(timestamp, raw[self.E_offset:self.E_offset+30])

+ 12 - 26
box-pc/application/inputs/balluff_html.py

@@ -3,48 +3,34 @@ import time
 import requests, json
 import re
 
-from structures.measurement import CurrentMeasurement
+from inputs.common import Input
 
-class Balluff():
+class Balluff(Input):
 
   cpu_start_time = None
   cpu_last_time = None
   local_start_time = time.time()
   db = 1
   interval = 0.05
-  url = "http://10.0.10.4/ports.jsn"
+  url = "http://192.168.10.20/ports.jsn"
   port = 0
 
-  def read(self):
+  def __init__(self):
+    super().__init__(self.read_handler)
+
+  def read_handler(self):
     try:
       req = requests.get(self.url)
     except requests.exceptions.ConnectionError:
-      return []
+      return
+
     timestamp = datetime.utcnow()
     response = json.loads(req.text)
     if not re.match("^DF210[01]$", response['ports'][self.port]['productId']):
       raise Exception("unsupported device " + response['ports'][self.port]['productId'])
 
     data = response['ports'][self.port]['processInputs'].split(" ")
-    data = [int(x, 16) for x in data]
-    status = data[16] << 8 | data[17]
-    overload = data[18] << 8 | data[19]
-    short = data[20] << 8 | data[21]
-    limit = data[22] << 8 | data[23]
-    buttons = data[24] << 8 | data[25]
-    undervoltage = data[26] & 1
-    voltage = data[28] << 8 | data[29]
-
-    points = []
-    for i in range(16):
-      if status & (1 << i):
-        points.append(CurrentMeasurement(timestamp, "IO", i, data[i] / 10))
-
-    return points
-
-  def read_continous(self):
-    while True:
-      points = self.read()
-      for point in points:
-        yield point
+    data = bytes([int(x, 16) for x in data])
+    
+    self.queue_ifm_from_bytes(timestamp, data)
       

+ 15 - 1
box-pc/application/inputs/common.py

@@ -1,6 +1,9 @@
 from threading import Thread
 from queue import Queue
 import time
+import struct
+
+from structures.measurement import Measurement24v
 
 class Input:
   _t = None
@@ -37,4 +40,15 @@ class Input:
       end_time = time.monotonic()
       remaining = self.interval + start_time - end_time
       if remaining > 0:
-        time.sleep(remaining)
+        time.sleep(remaining)
+
+  def queue_ifm_from_bytes(self, timestamp, raw, channels = 16):
+    data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw)
+    current = tuple([x / 10 for x in data[0:channels]])
+    status = tuple([data[17] & (1 << i) for i in range(channels)])
+    overload = tuple([data[17] & (1 << i) for i in range(channels)])
+    short_circuit = tuple([data[17] & (1 << i) for i in range(channels)])
+    limit = tuple([data[17] & (1 << i) for i in range(channels)])
+    pushbutton = tuple([data[17] & (1 << i) for i in range(channels)])
+    voltage = data[22] / 100
+    self._q.put(Measurement24v(timestamp, "AB", current, status, overload, short_circuit, limit, pushbutton, voltage))

+ 2 - 2
box-pc/application/inputs/snap7_connect.py

@@ -3,7 +3,7 @@ import time
 from datetime import datetime
 import struct
 
-from structures.measurement import CurrentMeasurement
+from structures.measurement import Measurement24v
 from inputs.common import Input
 
 localtz = datetime.now().astimezone().tzinfo
@@ -38,7 +38,7 @@ class SiemensCPU(Input):
     inc_time = (cpu_time - self.cpu_last_time) / self.cpu_db_value_count
     for i, val in enumerate(data[1:]):
       timestamp = self.get_timestamp(self.cpu_last_time + inc_time * (i+1))
-      self._q.put(CurrentMeasurement(timestamp, "S7", 0, val / 10))
+      self._q.put(Measurement(timestamp, "S7", "24v_current", 0, val / 10))
     self.cpu_last_time = cpu_time
 
   def get_timestamp(self, cpu_time):

+ 5 - 1
box-pc/application/main.py

@@ -17,9 +17,13 @@ balluff = Balluff()
 ab = AllenBradleyCPU()
 ab.start()
 
+start = 0
+
 while True:
   values = list(ab.read())
-  print(len(values))
+  dt = time.monotonic() - start
+  start = time.monotonic()
   db1.write(values)
   db2.write(values)
+  print("{:4d} Messungen in {:.03f} s, {:.3f} pro Sekunde".format(len(values), dt, len(values) / dt), end='\r')
   time.sleep(1)

+ 18 - 4
box-pc/application/structures/measurement.py

@@ -3,9 +3,23 @@ from datetime import datetime
 import string
 
 @dataclass
-class CurrentMeasurement:
+class Measurement24v:
   timestamp: datetime
   source: string
-  channel: int
-  current: float
-  saved: bool = False
+  current: tuple[float, ...]
+  status: int
+  overload: int
+  short_circuit: int
+  limit: int
+  pushbutton: int
+  voltage: float
+  series = "24v"
+
+@dataclass
+class Measurement480v:
+  timestamp: datetime
+  source: string
+  current: tuple[float, ...]
+  voltage: tuple[float, ...]
+  wattage: tuple[float, ...]
+  series = "480v"