123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import csv
- import os
- from datetime import datetime
- import dataclasses
- import zipfile
- import logging
- from structures.measurement import Measurement24v
- class CSVFile:
- file = None
- filename = None
- row_count = 0
- def __init__(self, path) -> None:
- self.path = path
- if not os.path.exists(self.path):
- os.mkdir(self.path)
- self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
- self.new_file()
- def new_file(self):
- if self.file:
- self.file.close()
- with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
- zf.write(self.filename, os.path.basename(self.filename))
- os.remove(self.filename)
-
- self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
- self.file = open(self.filename, "w", newline='')
- self.writer = csv.writer(self.file, delimiter=',')
- def write(self, values: list):
- try:
- meas: Measurement24v
- for i, meas in enumerate(values):
- row = dataclass_to_dict(meas)
- if self.row_count == 0:
- self.writer.writerow(row)
- self.writer.writerow(row.values())
- self.row_count += 1
- self.file.flush()
- if self.row_count > 50000:
- self.new_file()
- self.row_count = 0
- except Exception as ex:
- logging.exception("CSV write failed")
- def dataclass_to_dict(dc):
- ret = {}
- for field in dataclasses.fields(dc):
- value = getattr(dc, field.name)
- if not type(value) is tuple:
- ret[field.name] = value
- else:
- for i, v in enumerate(value):
- ret[F"{field.name}_{i}"] = v
- return ret
|