12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import csv
- import os
- from datetime import datetime
- import dataclasses
- from zipfile import ZipFile
- from structures.measurement import Measurement24v
- class CSVFile:
- path = "logs"
- file = None
- filename = None
- row_count = 0
- def __init__(self) -> None:
- if not os.path.exists(self.path):
- os.mkdir(self.path)
- self.new_file()
- def new_file(self):
- if self.file:
- self.file.close()
- with ZipFile(self.filename + ".zip", 'w') as zf:
- zf.write(self.filename, os.path.basename(self.filename))
- os.remove(self.filename)
-
- self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
- self.file = open(self.filename, "w", newline='')
- self.writer = csv.writer(self.file, delimiter=',')
- def write(self, values: list):
- try:
- meas: Measurement24v
- for i, meas in enumerate(values):
- row = dataclass_to_dict(meas)
- if self.row_count == 0:
- self.writer.writerow(row)
- self.writer.writerow(row.values())
- self.row_count += 1
- self.file.flush()
- if self.row_count > 50000:
- self.new_file()
- self.row_count = 0
- except Exception as ex:
- print("CSV write failed", ex)
- def dataclass_to_dict(dc):
- ret = {}
- for field in dataclasses.fields(dc):
- value = getattr(dc, field.name)
- if not type(value) is tuple:
- ret[field.name] = value
- else:
- for i, v in enumerate(value):
- ret[F"{field.name}_{i}"] = v
- return ret
|