import csv import os from datetime import datetime import dataclasses import zipfile import logging from structures.measurement import Measurement24v class CSVFile: file = None filename = None row_count = 0 def __init__(self, path) -> None: self.path = path if not os.path.exists(self.path): os.mkdir(self.path) self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip") self.new_file() def new_file(self): if self.file: self.file.close() with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf: zf.write(self.filename, os.path.basename(self.filename)) os.remove(self.filename) self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv") self.file = open(self.filename, "w", newline='') self.writer = csv.writer(self.file, delimiter=',') def write(self, values: list): try: meas: Measurement24v for i, meas in enumerate(values): row = dataclass_to_dict(meas) if self.row_count == 0: self.writer.writerow(row) self.writer.writerow(row.values()) self.row_count += 1 self.file.flush() if self.row_count > 50000: self.new_file() self.row_count = 0 except Exception as ex: logging.exception("CSV write failed") def dataclass_to_dict(dc): ret = {} for field in dataclasses.fields(dc): value = getattr(dc, field.name) if not type(value) is tuple: ret[field.name] = value else: for i, v in enumerate(value): ret[F"{field.name}_{i}"] = v return ret