csvFile.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. import zipfile
  6. from structures.measurement import Measurement24v
  7. class CSVFile:
  8. path = "logs"
  9. file = None
  10. filename = None
  11. row_count = 0
  12. def __init__(self) -> None:
  13. if not os.path.exists(self.path):
  14. os.mkdir(self.path)
  15. self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
  16. self.new_file()
  17. def new_file(self):
  18. if self.file:
  19. self.file.close()
  20. with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
  21. zf.write(self.filename, os.path.basename(self.filename))
  22. os.remove(self.filename)
  23. self.filename = os.path.join(self.path, F"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  24. self.file = open(self.filename, "w", newline='')
  25. self.writer = csv.writer(self.file, delimiter=',')
  26. def write(self, values: list):
  27. try:
  28. meas: Measurement24v
  29. for i, meas in enumerate(values):
  30. row = dataclass_to_dict(meas)
  31. if self.row_count == 0:
  32. self.writer.writerow(row)
  33. self.writer.writerow(row.values())
  34. self.row_count += 1
  35. self.file.flush()
  36. if self.row_count > 50000:
  37. self.new_file()
  38. self.row_count = 0
  39. except Exception as ex:
  40. print("CSV write failed", ex)
  41. def dataclass_to_dict(dc):
  42. ret = {}
  43. for field in dataclasses.fields(dc):
  44. value = getattr(dc, field.name)
  45. if not type(value) is tuple:
  46. ret[field.name] = value
  47. else:
  48. for i, v in enumerate(value):
  49. ret[F"{field.name}_{i}"] = v
  50. return ret