csvFile.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. import zipfile
  6. import logging
  7. from structures.measurement import Measurement24v
  8. class CSVFile:
  9. file = None
  10. filename = None
  11. row_count = 0
  12. def __init__(self, path) -> None:
  13. self.path = path
  14. if not os.path.exists(self.path):
  15. os.mkdir(self.path)
  16. self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
  17. self.new_file()
  18. def new_file(self):
  19. if self.file:
  20. self.file.close()
  21. with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
  22. zf.write(self.filename, os.path.basename(self.filename))
  23. os.remove(self.filename)
  24. self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  25. self.file = open(self.filename, "w", newline='')
  26. self.writer = csv.writer(self.file, delimiter=',')
  27. def write(self, values: list):
  28. try:
  29. meas: Measurement24v
  30. for i, meas in enumerate(values):
  31. row = dataclass_to_dict(meas)
  32. if self.row_count == 0:
  33. self.writer.writerow(row)
  34. self.writer.writerow(row.values())
  35. self.row_count += 1
  36. self.file.flush()
  37. if self.row_count > 50000:
  38. self.new_file()
  39. self.row_count = 0
  40. except Exception as ex:
  41. logging.exception("CSV write failed")
  42. def dataclass_to_dict(dc):
  43. ret = {}
  44. for field in dataclasses.fields(dc):
  45. value = getattr(dc, field.name)
  46. if not type(value) is tuple:
  47. ret[field.name] = value
  48. else:
  49. for i, v in enumerate(value):
  50. ret[F"{field.name}_{i}"] = v
  51. return ret